From a0e9736eb69b9d8fd683eda833081b1d0d183a4a Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 3 Sep 2024 12:54:08 +0400 Subject: [PATCH 1/3] netmap: Fix typo in method docs Signed-off-by: Leonard Lyubich --- netmap/policy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netmap/policy.go b/netmap/policy.go index 10ea8b99..a2e6b773 100644 --- a/netmap/policy.go +++ b/netmap/policy.go @@ -184,7 +184,7 @@ func (p *PlacementPolicy) ReadFromV2(m netmap.PlacementPolicy) error { return p.readFromV2(m, true) } -// WriteToV2 writes PlacementPolicy to the session.Token message. +// WriteToV2 writes PlacementPolicy to the netmap.PlacementPolicy message. // The message must not be nil. // // See also ReadFromV2. From 5cdc80de8cf7323d808e8ee9df63393b45fe42a2 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 3 Sep 2024 12:55:22 +0400 Subject: [PATCH 2/3] eacl: Fix `NewTable` function docs `Table` contains no session token and signature. Signed-off-by: Leonard Lyubich --- eacl/table.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/eacl/table.go b/eacl/table.go index 9a1d5d4a..65f05067 100644 --- a/eacl/table.go +++ b/eacl/table.go @@ -183,9 +183,7 @@ func (t Table) ToV2() *v2acl.Table { // Defaults: // - version: version.Current(); // - container ID: nil; -// - records: nil; -// - session token: nil; -// - signature: nil. +// - records: nil. // // Deprecated: use [ConstructTable] instead. func NewTable() *Table { From 12a4bf2d18d8a26cb0ecdac080dc1dfc72345908 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 6 Sep 2024 19:03:59 +0400 Subject: [PATCH 3/3] pool: Do not open no-op sessions with storage nodes Currently, there are only two NeoFS operations supporting sessions: object creation (PUT) and removal (DELETE). The latter is essentially creation of the tombstone object. Sessions are used in cases when client requests trusted storage node to form an object and sign it via so-called private session key. Public part of this key is signed by original client in the attached session token. This makes no sense for any other op because server does nothing on behalf of the client, it just executes his request. Previously, pool of NeoFS clients could open session for any object op incl. reading ones. As said, this broke nothing, but was completely redundant. This drops session opening for any op except PUT/DELETE to increase pool's efficiency and relieve network pressure. Signed-off-by: Leonard Lyubich --- pool/mock_test.go | 8 +- pool/object.go | 83 +------ pool/object_test.go | 555 ++++++++++++++++++++++++++++++++++++++++++++ pool/pool_test.go | 31 ++- 4 files changed, 586 insertions(+), 91 deletions(-) create mode 100644 pool/object_test.go diff --git a/pool/mock_test.go b/pool/mock_test.go index 5f776564..766e4868 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -37,6 +37,7 @@ type mockClient struct { errorOnEndpointInfo bool errorOnNetworkInfo bool errOnGetObject error + errOnPutObject error } func (m *mockClient) Dial(_ client.PrmDial) error { @@ -116,8 +117,7 @@ func (m *mockClient) NetMapSnapshot(_ context.Context, _ client.PrmNetMapSnapsho } func (m *mockClient) ObjectPutInit(_ context.Context, _ object.Object, _ user.Signer, _ client.PrmObjectPutInit) (client.ObjectWriter, error) { - // TODO implement me - panic("implement me") + return nil, m.errOnPutObject } func (m *mockClient) ObjectGetInit(_ context.Context, _ cid.ID, _ oid.ID, _ user.Signer, _ client.PrmObjectGet) (object.Object, *client.PayloadReader, error) { @@ -214,6 +214,10 @@ func (m *mockClient) statusOnGetObject(err error) { m.errOnGetObject = err } +func (m *mockClient) statusOnPutObject(err error) { + m.errOnPutObject = err +} + func (m *mockClient) dial(context.Context) error { if m.errorOnDial { return errors.New("dial error") diff --git a/pool/object.go b/pool/object.go index baf73917..24301598 100644 --- a/pool/object.go +++ b/pool/object.go @@ -65,26 +65,11 @@ func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, signer user // // See details in [client.Client.ObjectGetInit]. func (p *Pool) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error) { - var hdr object.Object c, err := p.sdkClient() if err != nil { - return hdr, nil, err + return object.Object{}, nil, err } - if err = p.withinContainerSession( - ctx, - c, - containerID, - p.actualSigner(signer), - session.VerbObjectGet, - &prm, - ); err != nil { - return hdr, nil, fmt.Errorf("session: %w", err) - } - - hdr, payloadReader, err := c.ObjectGetInit(ctx, containerID, objectID, signer, prm) - p.checkSessionTokenErr(err, c.addr, c.nodeSession) - - return hdr, payloadReader, err + return c.ObjectGetInit(ctx, containerID, objectID, signer, prm) } // ObjectHead reads object header through a remote server using NeoFS API protocol. @@ -97,21 +82,7 @@ func (p *Pool) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid. if err != nil { return nil, err } - if err = p.withinContainerSession( - ctx, - c, - containerID, - p.actualSigner(signer), - session.VerbObjectHead, - &prm, - ); err != nil { - return nil, fmt.Errorf("session: %w", err) - } - - hdr, err := c.ObjectHead(ctx, containerID, objectID, signer, prm) - p.checkSessionTokenErr(err, c.addr, c.nodeSession) - - return hdr, err + return c.ObjectHead(ctx, containerID, objectID, signer, prm) } // ObjectRangeInit initiates reading an object's payload range through a remote @@ -124,21 +95,7 @@ func (p *Pool) ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID if err != nil { return nil, err } - if err = p.withinContainerSession( - ctx, - c, - containerID, - p.actualSigner(signer), - session.VerbObjectRange, - &prm, - ); err != nil { - return nil, fmt.Errorf("session: %w", err) - } - - reader, err := c.ObjectRangeInit(ctx, containerID, objectID, offset, length, signer, prm) - p.checkSessionTokenErr(err, c.addr, c.nodeSession) - - return reader, err + return c.ObjectRangeInit(ctx, containerID, objectID, offset, length, signer, prm) } // ObjectDelete marks an object for deletion from the container using NeoFS API protocol. @@ -178,21 +135,7 @@ func (p *Pool) ObjectHash(ctx context.Context, containerID cid.ID, objectID oid. if err != nil { return [][]byte{}, err } - if err = p.withinContainerSession( - ctx, - c, - containerID, - p.actualSigner(signer), - session.VerbObjectRangeHash, - &prm, - ); err != nil { - return [][]byte{}, fmt.Errorf("session: %w", err) - } - - data, err := c.ObjectHash(ctx, containerID, objectID, signer, prm) - p.checkSessionTokenErr(err, c.addr, c.nodeSession) - - return data, err + return c.ObjectHash(ctx, containerID, objectID, signer, prm) } // ObjectSearchInit initiates object selection through a remote server using NeoFS API protocol. @@ -205,19 +148,5 @@ func (p *Pool) ObjectSearchInit(ctx context.Context, containerID cid.ID, signer if err != nil { return nil, err } - if err = p.withinContainerSession( - ctx, - c, - containerID, - p.actualSigner(signer), - session.VerbObjectSearch, - &prm, - ); err != nil { - return nil, fmt.Errorf("session: %w", err) - } - - reader, err := c.ObjectSearchInit(ctx, containerID, signer, prm) - p.checkSessionTokenErr(err, c.addr, c.nodeSession) - - return reader, err + return c.ObjectSearchInit(ctx, containerID, signer, prm) } diff --git a/pool/object_test.go b/pool/object_test.go new file mode 100644 index 00000000..786fbdb9 --- /dev/null +++ b/pool/object_test.go @@ -0,0 +1,555 @@ +package pool + +import ( + "context" + "errors" + "fmt" + "slices" + "testing" + + "github.com/nspcc-dev/neofs-sdk-go/accounting" + bearertest "github.com/nspcc-dev/neofs-sdk-go/bearer/test" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/container" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/eacl" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + "github.com/nspcc-dev/neofs-sdk-go/session" + sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test" + "github.com/nspcc-dev/neofs-sdk-go/user" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type noOtherClientCalls struct{} + +func (noOtherClientCalls) Dial(client.PrmDial) error { panic("must not be called") } + +func (noOtherClientCalls) BalanceGet(context.Context, client.PrmBalanceGet) (accounting.Decimal, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ContainerPut(context.Context, container.Container, neofscrypto.Signer, client.PrmContainerPut) (cid.ID, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ContainerGet(context.Context, cid.ID, client.PrmContainerGet) (container.Container, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ContainerList(context.Context, user.ID, client.PrmContainerList) ([]cid.ID, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ContainerDelete(context.Context, cid.ID, neofscrypto.Signer, client.PrmContainerDelete) error { + panic("must not be called") +} + +func (noOtherClientCalls) ContainerEACL(context.Context, cid.ID, client.PrmContainerEACL) (eacl.Table, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ContainerSetEACL(context.Context, eacl.Table, user.Signer, client.PrmContainerSetEACL) error { + panic("must not be called") +} + +func (noOtherClientCalls) NetworkInfo(context.Context, client.PrmNetworkInfo) (netmap.NetworkInfo, error) { + panic("must not be called") +} + +func (noOtherClientCalls) NetMapSnapshot(context.Context, client.PrmNetMapSnapshot) (netmap.NetMap, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ObjectPutInit(context.Context, object.Object, user.Signer, client.PrmObjectPutInit) (client.ObjectWriter, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ObjectGetInit(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectGet) (object.Object, *client.PayloadReader, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ObjectHead(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectHead) (*object.Object, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ObjectRangeInit(context.Context, cid.ID, oid.ID, uint64, uint64, user.Signer, client.PrmObjectRange) (*client.ObjectRangeReader, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ObjectDelete(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectDelete) (oid.ID, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ObjectHash(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectHash) ([][]byte, error) { + panic("must not be called") +} + +func (noOtherClientCalls) ObjectSearchInit(context.Context, cid.ID, user.Signer, client.PrmObjectSearch) (*client.ObjectListReader, error) { + panic("must not be called") +} + +func (noOtherClientCalls) SessionCreate(context.Context, user.Signer, client.PrmSessionCreate) (*client.ResSessionCreate, error) { + panic("must not be called") +} + +func (noOtherClientCalls) EndpointInfo(context.Context, client.PrmEndpointInfo) (*client.ResEndpointInfo, error) { + panic("must not be called") +} + +type mockedClientWrapper struct { + addr string +} + +func (x mockedClientWrapper) isHealthy() bool { return true } +func (x mockedClientWrapper) setUnhealthy() { panic("must not be called") } +func (x mockedClientWrapper) address() string { return x.addr } +func (x mockedClientWrapper) currentErrorRate() uint32 { panic("must not be called") } +func (x mockedClientWrapper) overallErrorRate() uint64 { panic("must not be called") } +func (x mockedClientWrapper) SetNodeSession(*session.Object, neofscrypto.PublicKey) { + panic("must not be called") +} +func (x mockedClientWrapper) GetNodeSession(neofscrypto.PublicKey) *session.Object { + panic("must not be called") +} +func (x mockedClientWrapper) ResetSessions() { panic("must not be called") } +func (x mockedClientWrapper) dial(context.Context) error { return nil } +func (x mockedClientWrapper) restartIfUnhealthy(context.Context) (bool, bool) { + panic("must not be called") +} +func (x mockedClientWrapper) getClient() (sdkClientInterface, error) { panic("must not be called") } +func (x mockedClientWrapper) getRawClient() (*client.Client, error) { panic("must not be called") } + +type objectGetOnlyClient struct { + noOtherClientCalls + // expected input + cnr cid.ID + objID oid.ID + sgnr user.Signer + opts client.PrmObjectGet + // ret + hdr object.Object + pld *client.PayloadReader + err error +} + +func (x objectGetOnlyClient) ObjectGetInit(ctx context.Context, cnr cid.ID, objID oid.ID, signer user.Signer, opts client.PrmObjectGet) (object.Object, *client.PayloadReader, error) { + switch { + case ctx == nil: + return object.Object{}, nil, errors.New("[test] nil context") + case cnr != x.cnr: + return object.Object{}, nil, errors.New("[test] wrong container") + case objID != x.objID: + return object.Object{}, nil, errors.New("[test] wrong object ID") + case !assert.ObjectsAreEqual(signer, x.sgnr): + return object.Object{}, nil, errors.New("[test] wrong signer") + case !assert.ObjectsAreEqual(opts, x.opts): + return object.Object{}, nil, errors.New("[test] wrong options") + } + return x.hdr, x.pld, x.err +} + +type objectGetOnlyClientWrapper struct { + mockedClientWrapper + c objectGetOnlyClient +} + +func (x objectGetOnlyClientWrapper) getClient() (sdkClientInterface, error) { return x.c, nil } + +func TestPool_ObjectGetInit(t *testing.T) { + ctx := context.Background() + cnrID := cidtest.ID() + objID := oidtest.ID() + usr := usertest.User() + + var getOpts client.PrmObjectGet + getOpts.WithinSession(sessiontest.Object()) + getOpts.WithBearerToken(bearertest.Token()) + getOpts.MarkRaw() + getOpts.MarkLocal() + getOpts.WithXHeaders("k1", "v1", "k2", "v2") + + getClient := objectGetOnlyClient{ + cnr: cnrID, + objID: objID, + sgnr: usr, + opts: getOpts, + hdr: objecttest.Object(), + pld: nil, // no way to construct + err: errors.New("any error"), + } + endpoints := []string{"localhost:8080", "localhost:8081"} + nodes := make([]NodeParam, len(endpoints)) + cws := make([]objectGetOnlyClientWrapper, len(endpoints)) + for i := range endpoints { + nodes[i].address = endpoints[i] + cws[i].addr = endpoints[i] + cws[i].c = getClient + } + + var poolOpts InitParameters + poolOpts.setClientBuilder(func(endpoint string) (internalClient, error) { + ind := slices.Index(endpoints, endpoint) + if ind < 0 { + return nil, fmt.Errorf("unexpected endpoint %q", endpoint) + } + return &cws[ind], nil + }) + p, err := New(nodes, usertest.User().RFC6979, poolOpts) + require.NoError(t, err) + require.NoError(t, p.Dial(ctx)) + t.Cleanup(p.Close) + + hdr, pld, err := p.ObjectGetInit(context.Background(), cnrID, objID, usr, getOpts) + require.Equal(t, err, getClient.err) + require.Equal(t, hdr, getClient.hdr) + require.Equal(t, pld, getClient.pld) +} + +type objectHeadOnlyClient struct { + noOtherClientCalls + // expected input + cnr cid.ID + objID oid.ID + sgnr user.Signer + opts client.PrmObjectHead + // ret + hdr object.Object + err error +} + +func (x objectHeadOnlyClient) ObjectHead(ctx context.Context, cnr cid.ID, objID oid.ID, signer user.Signer, opts client.PrmObjectHead) (*object.Object, error) { + switch { + case ctx == nil: + return nil, errors.New("[test] nil context") + case cnr != x.cnr: + return nil, errors.New("[test] wrong container") + case objID != x.objID: + return nil, errors.New("[test] wrong object ID") + case !assert.ObjectsAreEqual(signer, x.sgnr): + return nil, errors.New("[test] wrong signer") + case !assert.ObjectsAreEqual(opts, x.opts): + return nil, errors.New("[test] wrong options") + } + return &x.hdr, x.err +} + +type objectHeadOnlyClientWrapper struct { + mockedClientWrapper + c objectHeadOnlyClient +} + +func (x objectHeadOnlyClientWrapper) getClient() (sdkClientInterface, error) { return x.c, nil } + +func TestPool_ObjectHead(t *testing.T) { + ctx := context.Background() + cnrID := cidtest.ID() + objID := oidtest.ID() + usr := usertest.User() + + var headOpts client.PrmObjectHead + headOpts.WithinSession(sessiontest.Object()) + headOpts.WithBearerToken(bearertest.Token()) + headOpts.MarkRaw() + headOpts.MarkLocal() + headOpts.WithXHeaders("k1", "v1", "k2", "v2") + + headClient := objectHeadOnlyClient{ + cnr: cnrID, + objID: objID, + sgnr: usr, + opts: headOpts, + hdr: objecttest.Object(), + err: errors.New("any error"), + } + endpoints := []string{"localhost:8080", "localhost:8081"} + nodes := make([]NodeParam, len(endpoints)) + cws := make([]objectHeadOnlyClientWrapper, len(endpoints)) + for i := range endpoints { + nodes[i].address = endpoints[i] + cws[i].addr = endpoints[i] + cws[i].c = headClient + } + + var poolOpts InitParameters + poolOpts.setClientBuilder(func(endpoint string) (internalClient, error) { + ind := slices.Index(endpoints, endpoint) + if ind < 0 { + return nil, fmt.Errorf("unexpected endpoint %q", endpoint) + } + return &cws[ind], nil + }) + p, err := New(nodes, usertest.User().RFC6979, poolOpts) + require.NoError(t, err) + require.NoError(t, p.Dial(ctx)) + t.Cleanup(p.Close) + + hdr, err := p.ObjectHead(context.Background(), cnrID, objID, usr, headOpts) + require.Equal(t, err, headClient.err) + require.Equal(t, hdr, &headClient.hdr) +} + +type objectRangeOnlyClient struct { + noOtherClientCalls + // expected input + cnr cid.ID + objID oid.ID + off, ln uint64 + sgnr user.Signer + opts client.PrmObjectRange + // ret + pld *client.ObjectRangeReader + err error +} + +func (x objectRangeOnlyClient) ObjectRangeInit(ctx context.Context, cnr cid.ID, objID oid.ID, off, ln uint64, signer user.Signer, opts client.PrmObjectRange) (*client.ObjectRangeReader, error) { + switch { + case ctx == nil: + return nil, errors.New("[test] nil context") + case cnr != x.cnr: + return nil, errors.New("[test] wrong container") + case objID != x.objID: + return nil, errors.New("[test] wrong object ID") + case off != x.off: + return nil, errors.New("[test] wrong range offset") + case ln != x.ln: + return nil, errors.New("[test] wrong range length") + case !assert.ObjectsAreEqual(signer, x.sgnr): + return nil, errors.New("[test] wrong signer") + case !assert.ObjectsAreEqual(opts, x.opts): + return nil, errors.New("[test] wrong options") + } + return x.pld, x.err +} + +type objectRangeOnlyClientWrapper struct { + mockedClientWrapper + c objectRangeOnlyClient +} + +func (x objectRangeOnlyClientWrapper) getClient() (sdkClientInterface, error) { return x.c, nil } + +func TestPool_ObjectRangeInit(t *testing.T) { + ctx := context.Background() + cnrID := cidtest.ID() + objID := oidtest.ID() + const off, ln = 13, 42 + usr := usertest.User() + + var rangeOpts client.PrmObjectRange + rangeOpts.WithinSession(sessiontest.Object()) + rangeOpts.WithBearerToken(bearertest.Token()) + rangeOpts.MarkRaw() + rangeOpts.MarkLocal() + rangeOpts.WithXHeaders("k1", "v1", "k2", "v2") + + rangeClient := objectRangeOnlyClient{ + cnr: cnrID, + objID: objID, + off: off, + ln: ln, + sgnr: usr, + opts: rangeOpts, + pld: nil, // no way to construct + err: errors.New("any error"), + } + endpoints := []string{"localhost:8080", "localhost:8081"} + nodes := make([]NodeParam, len(endpoints)) + cws := make([]objectRangeOnlyClientWrapper, len(endpoints)) + for i := range endpoints { + nodes[i].address = endpoints[i] + cws[i].addr = endpoints[i] + cws[i].c = rangeClient + } + + var poolOpts InitParameters + poolOpts.setClientBuilder(func(endpoint string) (internalClient, error) { + ind := slices.Index(endpoints, endpoint) + if ind < 0 { + return nil, fmt.Errorf("unexpected endpoint %q", endpoint) + } + return &cws[ind], nil + }) + p, err := New(nodes, usertest.User().RFC6979, poolOpts) + require.NoError(t, err) + require.NoError(t, p.Dial(ctx)) + t.Cleanup(p.Close) + + pld, err := p.ObjectRangeInit(context.Background(), cnrID, objID, off, ln, usr, rangeOpts) + require.Equal(t, err, rangeClient.err) + require.Equal(t, pld, rangeClient.pld) +} + +type objectHashOnlyClient struct { + noOtherClientCalls + // expected input + cnr cid.ID + objID oid.ID + sgnr user.Signer + opts client.PrmObjectHash + // ret + hs [][]byte + err error +} + +func (x objectHashOnlyClient) ObjectHash(ctx context.Context, cnr cid.ID, objID oid.ID, signer user.Signer, opts client.PrmObjectHash) ([][]byte, error) { + switch { + case ctx == nil: + return nil, errors.New("[test] nil context") + case cnr != x.cnr: + return nil, errors.New("[test] wrong container") + case objID != x.objID: + return nil, errors.New("[test] wrong object ID") + case !assert.ObjectsAreEqual(signer, x.sgnr): + return nil, errors.New("[test] wrong signer") + case !assert.ObjectsAreEqual(opts, x.opts): + return nil, errors.New("[test] wrong options") + } + return x.hs, x.err +} + +type objectHashOnlyClientWrapper struct { + mockedClientWrapper + c objectHashOnlyClient +} + +func (x objectHashOnlyClientWrapper) getClient() (sdkClientInterface, error) { return x.c, nil } + +func TestPool_ObjectHash(t *testing.T) { + ctx := context.Background() + cnrID := cidtest.ID() + objID := oidtest.ID() + usr := usertest.User() + + var hashOpts client.PrmObjectHash + hashOpts.WithinSession(sessiontest.Object()) + hashOpts.WithBearerToken(bearertest.Token()) + hashOpts.MarkLocal() + hashOpts.WithXHeaders("k1", "v1", "k2", "v2") + hashOpts.TillichZemorAlgo() + hashOpts.SetRangeList(1, 2, 3, 4) + hashOpts.UseSalt([]byte("any_salt")) + + hashClient := objectHashOnlyClient{ + cnr: cnrID, + objID: objID, + sgnr: usr, + opts: hashOpts, + hs: [][]byte{[]byte("hash1"), []byte("hash2")}, + err: errors.New("any error"), + } + endpoints := []string{"localhost:8080", "localhost:8081"} + nodes := make([]NodeParam, len(endpoints)) + cws := make([]objectHashOnlyClientWrapper, len(endpoints)) + for i := range endpoints { + nodes[i].address = endpoints[i] + cws[i].addr = endpoints[i] + cws[i].c = hashClient + } + + var poolOpts InitParameters + poolOpts.setClientBuilder(func(endpoint string) (internalClient, error) { + ind := slices.Index(endpoints, endpoint) + if ind < 0 { + return nil, fmt.Errorf("unexpected endpoint %q", endpoint) + } + return &cws[ind], nil + }) + p, err := New(nodes, usertest.User().RFC6979, poolOpts) + require.NoError(t, err) + require.NoError(t, p.Dial(ctx)) + t.Cleanup(p.Close) + + hs, err := p.ObjectHash(context.Background(), cnrID, objID, usr, hashOpts) + require.Equal(t, err, hashClient.err) + require.Equal(t, hs, hashClient.hs) +} + +type objectSearchOnlyClient struct { + noOtherClientCalls + // expected input + cnr cid.ID + sgnr user.Signer + opts client.PrmObjectSearch + // ret + rdr *client.ObjectListReader + err error +} + +func (x objectSearchOnlyClient) ObjectSearchInit(ctx context.Context, cnr cid.ID, signer user.Signer, opts client.PrmObjectSearch) (*client.ObjectListReader, error) { + switch { + case ctx == nil: + return nil, errors.New("[test] nil context") + case cnr != x.cnr: + return nil, errors.New("[test] wrong container") + case !assert.ObjectsAreEqual(signer, x.sgnr): + return nil, errors.New("[test] wrong signer") + case !assert.ObjectsAreEqual(opts, x.opts): + return nil, errors.New("[test] wrong options") + } + return x.rdr, x.err +} + +type objectSearchOnlyClientWrapper struct { + mockedClientWrapper + c objectSearchOnlyClient +} + +func (x objectSearchOnlyClientWrapper) getClient() (sdkClientInterface, error) { return x.c, nil } + +func TestPool_ObjectSearchInit(t *testing.T) { + ctx := context.Background() + cnrID := cidtest.ID() + usr := usertest.User() + + var sfs object.SearchFilters + sfs.AddFilter("k1", "v1", object.MatchStringEqual) + sfs.AddFilter("k2", "v2", object.MatchStringNotEqual) + + var searchOpts client.PrmObjectSearch + searchOpts.WithinSession(sessiontest.Object()) + searchOpts.WithBearerToken(bearertest.Token()) + searchOpts.MarkLocal() + searchOpts.WithXHeaders("k1", "v1", "k2", "v2") + searchOpts.SetFilters(sfs) + + searchClient := objectSearchOnlyClient{ + cnr: cnrID, + sgnr: usr, + opts: searchOpts, + rdr: nil, // no way to construct + err: errors.New("any error"), + } + endpoints := []string{"localhost:8080", "localhost:8081"} + nodes := make([]NodeParam, len(endpoints)) + cws := make([]objectSearchOnlyClientWrapper, len(endpoints)) + for i := range endpoints { + nodes[i].address = endpoints[i] + cws[i].addr = endpoints[i] + cws[i].c = searchClient + } + + var poolOpts InitParameters + poolOpts.setClientBuilder(func(endpoint string) (internalClient, error) { + ind := slices.Index(endpoints, endpoint) + if ind < 0 { + return nil, fmt.Errorf("unexpected endpoint %q", endpoint) + } + return &cws[ind], nil + }) + p, err := New(nodes, usertest.User().RFC6979, poolOpts) + require.NoError(t, err) + require.NoError(t, p.Dial(ctx)) + t.Cleanup(p.Close) + + rdr, err := p.ObjectSearchInit(context.Background(), cnrID, usr, searchOpts) + require.Equal(t, err, searchClient.err) + require.Equal(t, rdr, searchClient.rdr) +} diff --git a/pool/pool_test.go b/pool/pool_test.go index 58644f39..48447366 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -10,8 +10,10 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" neofscryptotest "github.com/nspcc-dev/neofs-sdk-go/crypto/test" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" "github.com/nspcc-dev/neofs-sdk-go/session" usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" "github.com/stretchr/testify/require" @@ -273,8 +275,11 @@ func TestSessionCache(t *testing.T) { cp, err := pool.connection() require.NoError(t, err) - var containerID cid.ID - cacheKey := cacheKeyForSession(cp.address(), pool.signer, session.VerbObjectGet, containerID) + containerID := cidtest.ID() + cacheKey := cacheKeyForSession(cp.address(), pool.signer, session.VerbObjectPut, containerID) + + hdr := objecttest.Object() + hdr.SetContainerID(containerID) t.Run("no session token after pool creation", func(t *testing.T) { st, ok := pool.cache.Get(cacheKey) @@ -283,7 +288,7 @@ func TestSessionCache(t *testing.T) { }) t.Run("session token was created after request", func(t *testing.T) { - _, _, err = pool.ObjectGetInit(ctx, containerID, oid.ID{}, usr, client.PrmObjectGet{}) + _, err = pool.ObjectPutInit(ctx, hdr, usr, client.PrmObjectPutInit{}) require.NoError(t, err) st, ok := pool.cache.Get(cacheKey) @@ -293,9 +298,9 @@ func TestSessionCache(t *testing.T) { t.Run("session is not removed", func(t *testing.T) { // error on the next request to the node - mockCli.statusOnGetObject(errors.New("some error")) + mockCli.statusOnPutObject(errors.New("some error")) - _, _, err = pool.ObjectGetInit(ctx, cid.ID{}, oid.ID{}, usr, client.PrmObjectGet{}) + _, err = pool.ObjectPutInit(ctx, hdr, usr, client.PrmObjectPutInit{}) require.Error(t, err) _, ok := pool.cache.Get(cacheKey) @@ -304,10 +309,10 @@ func TestSessionCache(t *testing.T) { t.Run("session is removed, because of the special error", func(t *testing.T) { // error on the next request to the node - mockCli.statusOnGetObject(apistatus.SessionTokenNotFound{}) + mockCli.statusOnPutObject(apistatus.SessionTokenNotFound{}) // make request, - _, _, err = pool.ObjectGetInit(ctx, cid.ID{}, oid.ID{}, usr, client.PrmObjectGet{}) + _, err = pool.ObjectPutInit(ctx, hdr, usr, client.PrmObjectPutInit{}) require.Error(t, err) // cache must not contain session token @@ -318,9 +323,9 @@ func TestSessionCache(t *testing.T) { }) t.Run("session created again", func(t *testing.T) { - mockCli.statusOnGetObject(nil) + mockCli.statusOnPutObject(nil) - _, _, err = pool.ObjectGetInit(ctx, cid.ID{}, oid.ID{}, usr, client.PrmObjectGet{}) + _, err = pool.ObjectPutInit(ctx, hdr, usr, client.PrmObjectPutInit{}) require.NoError(t, err) _, ok := pool.cache.Get(cacheKey) @@ -441,12 +446,14 @@ func TestSessionTokenOwner(t *testing.T) { anonSigner := usertest.User() - var containerID cid.ID + containerID := cidtest.ID() + hdr := objecttest.Object() + hdr.SetContainerID(containerID) - _, _, err = p.ObjectGetInit(ctx, containerID, oid.ID{}, anonSigner, client.PrmObjectGet{}) + _, err = p.ObjectPutInit(ctx, hdr, anonSigner, client.PrmObjectPutInit{}) require.NoError(t, err) - cacheKey := cacheKeyForSession(cp.address(), anonSigner, session.VerbObjectGet, containerID) + cacheKey := cacheKeyForSession(cp.address(), anonSigner, session.VerbObjectPut, containerID) st, ok := p.cache.Get(cacheKey) require.True(t, ok) require.True(t, st.AssertAuthKey(anonSigner.Public()))