Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/replicate with signature #622

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 50 additions & 14 deletions client/object_replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"sync"

objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc"
Expand All @@ -32,6 +33,10 @@
// ReplicateObject is intended for maintaining data storage by NeoFS system
// nodes only, not for regular use.
//
// If signedReplication, client requests server to sign replicated object
// information to ensure replication was successful. Signature is returned
// (nil if not requested).
//
// Object must be encoded in compliance with Protocol Buffers v3 format in
// ascending order of fields.
//
Expand All @@ -48,38 +53,38 @@
// replicated object;
// - [apistatus.ErrContainerNotFound]: the container to which the replicated
// object is associated was not found.
func (c *Client) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error {
func (c *Client) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, signedReplication bool) (*neofscrypto.Signature, error) {
Copy link
Contributor

@cthulhu-rider cthulhu-rider Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which cases signedReplication gonna be false in practice?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-initial replication?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also as i understand, it will stiil be possible to skip this for some sort of container after nspcc-dev/neofs-api#300, no @roman-khimov?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, containers with different consistency policies can have different requirements wrt this as well.

const svcName = "neo.fs.v2.object.ObjectService"
const opName = "Replicate"
stream, err := c.c.Init(common.CallMethodInfoUnary(svcName, opName),
client.WithContext(ctx), client.AllowBinarySendingOnly())
if err != nil {
return fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err)
return nil, fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err)

Check warning on line 62 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L62

Added line #L62 was not covered by tests
}

msg, err := prepareReplicateMessage(id, src, signer)
msg, err := prepareReplicateMessage(id, src, signer, signedReplication)
if err != nil {
return err
return nil, err

Check warning on line 67 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L67

Added line #L67 was not covered by tests
}

err = stream.WriteMessage(client.BinaryMessage(msg))
if err != nil && !errors.Is(err, io.EOF) { // io.EOF means the server closed the stream on its side
return fmt.Errorf("send request: %w", err)
return nil, fmt.Errorf("send request: %w", err)

Check warning on line 72 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L72

Added line #L72 was not covered by tests
}

var resp replicateResponse
resp := replicateResponse{_sigRequested: signedReplication}
err = stream.ReadMessage(&resp)
if err != nil {
if errors.Is(err, io.EOF) {
err = io.ErrUnexpectedEOF
}

return fmt.Errorf("recv response: %w", err)
return nil, fmt.Errorf("recv response: %w", err)
}

_ = stream.Close()

return resp.err
return resp.objSig, resp.err
}

// DemuxReplicatedObject allows to share same argument between multiple
Expand Down Expand Up @@ -108,23 +113,23 @@
return x.rs.Seek(offset, whence)
}

func prepareReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) {
func prepareReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, signedReplication bool) ([]byte, error) {
srm, ok := src.(*demuxReplicationMessage)
if !ok {
return newReplicateMessage(id, src, signer)
return newReplicateMessage(id, src, signer, signedReplication)
}

srm.mtx.Lock()
defer srm.mtx.Unlock()

if srm.msg == nil && srm.err == nil {
srm.msg, srm.err = newReplicateMessage(id, src, signer)
srm.msg, srm.err = newReplicateMessage(id, src, signer, signedReplication)
}

return srm.msg, srm.err
}

func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) {
func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, requireObjectSignature bool) ([]byte, error) {
var objSize uint64
switch v := src.(type) {
default:
Expand Down Expand Up @@ -169,13 +174,15 @@

const fieldNumObject = 1
const fieldNumSignature = 2
const fieldNumSignObjectFlag = 3

sigSize := protowire.SizeTag(fieldNumSigPubKey) + protowire.SizeBytes(len(bPubKey)) +
protowire.SizeTag(fieldNumSigVal) + protowire.SizeBytes(len(idSig)) +
protowire.SizeTag(fieldNumSigScheme) + protowire.SizeVarint(sigScheme)

msgSize := protowire.SizeTag(fieldNumObject) + protowire.SizeVarint(objSize) +
protowire.SizeTag(fieldNumSignature) + protowire.SizeBytes(sigSize)
protowire.SizeTag(fieldNumSignature) + protowire.SizeBytes(sigSize) +
protowire.SizeTag(fieldNumSignObjectFlag) + protowire.SizeVarint(protowire.EncodeBool(requireObjectSignature))

// TODO(#544): support external buffers
msg := make([]byte, 0, uint64(msgSize)+objSize)
Expand All @@ -198,12 +205,17 @@
msg = protowire.AppendBytes(msg, idSig)
msg = protowire.AppendTag(msg, fieldNumSigScheme, protowire.VarintType)
msg = protowire.AppendVarint(msg, sigScheme)
msg = protowire.AppendTag(msg, fieldNumSignObjectFlag, protowire.VarintType)
msg = protowire.AppendVarint(msg, protowire.EncodeBool(requireObjectSignature))

return msg, nil
}

type replicateResponse struct {
err error
_sigRequested bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C-style?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks weird. And it's not a response field, really.

Copy link
Member Author

@carpawell carpawell Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only possibility of understanding a signature is missing but was requested in the current code. we also may just do nothing if it is missing and return nil, it is just an RPC wrapper without any logic then, maybe that is good too, not sure

And it's not a response field, really

yes, that is why it is underscored...

Looks weird

... which is available by the language and not prohibited by any of our linters


objSig *neofscrypto.Signature
err error
}

func (x replicateResponse) ToGRPCMessage() grpc.Message {
Expand All @@ -226,6 +238,30 @@
}

x.err = apistatus.ErrorFromV2(st)
if x.err != nil {
return nil
}

if !x._sigRequested {
return nil
}

sig := m.GetObjectSignature()
if sig == nil {
return errors.New("requested but missing signature")

Check warning on line 251 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L251

Added line #L251 was not covered by tests
}

sigV2 := new(refs.Signature)
err := sigV2.Unmarshal(sig)
if err != nil {
return fmt.Errorf("decoding signature from proto message: %w", err)

Check warning on line 257 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L257

Added line #L257 was not covered by tests
}

x.objSig = new(neofscrypto.Signature)
err = x.objSig.ReadFromV2(*sigV2)
if err != nil {
return fmt.Errorf("invalid signature: %w", err)

Check warning on line 263 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L263

Added line #L263 was not covered by tests
}

return nil
}
43 changes: 36 additions & 7 deletions client/object_replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
Expand Down Expand Up @@ -37,11 +38,13 @@ func BenchmarkPrepareReplicationMessage(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer)
_, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer, true)
require.NoError(b, err)
}
}

var testDataToSign = []byte("requested data to sign")

type testReplicationServer struct {
objectgrpc.UnimplementedObjectServiceServer

Expand All @@ -64,7 +67,7 @@ func (x *testReplicationServer) Replicate(_ context.Context, req *objectgrpc.Rep
}

sigMsg := req.GetSignature()
if objMsg == nil {
if sigMsg == nil {
st.Code = 1024 // internal error
st.Message = "missing signature field"
resp.Status = &st
Expand Down Expand Up @@ -121,6 +124,23 @@ func (x *testReplicationServer) Replicate(_ context.Context, req *objectgrpc.Rep
return &resp, nil
}

if req.GetSignObject() {
var sig neofscrypto.Signature
err = sig.Calculate(x.clientSigner, testDataToSign)
if err != nil {
st.Code = 1024
st.Message = fmt.Sprintf("signing object information: %s", err)
resp.Status = &st

return &resp, nil
}

var sigV2 refs.Signature
sig.WriteToV2(&sigV2)

resp.ObjectSignature = sigV2.StableMarshal(nil)
}

resp.Status = &status.Status{Code: x.respStatusCode}
return &resp, nil
}
Expand Down Expand Up @@ -162,15 +182,15 @@ func TestClient_ReplicateObject(t *testing.T) {
srv, cli := serveObjectReplication(t, signer, obj)
srv.respStatusCode = 0

err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer)
_, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false)
require.NoError(t, err)
})

t.Run("invalid binary object", func(t *testing.T) {
bObj := []byte("Hello, world!") // definitely incorrect binary object
_, cli := serveObjectReplication(t, signer, obj)

err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer)
_, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false)
require.Error(t, err)
})

Expand All @@ -187,16 +207,25 @@ func TestClient_ReplicateObject(t *testing.T) {
srv, cli := serveObjectReplication(t, signer, obj)
srv.respStatusCode = tc.code

err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer)
_, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false)
require.ErrorIs(t, err, tc.expErr, tc.desc)
}
})

t.Run("sign object data", func(t *testing.T) {
srv, cli := serveObjectReplication(t, signer, obj)
srv.respStatusCode = 0

sig, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, true)
require.NoError(t, err)
require.True(t, sig.Verify(testDataToSign))
})

t.Run("demux", func(t *testing.T) {
demuxObj := DemuxReplicatedObject(bytes.NewReader(bObj))
_, cli := serveObjectReplication(t, signer, obj)

err := cli.ReplicateObject(ctx, id, demuxObj, signer)
_, err := cli.ReplicateObject(ctx, id, demuxObj, signer, false)
require.NoError(t, err)

msgCp := bytes.Clone(demuxObj.(*demuxReplicationMessage).msg)
Expand All @@ -208,7 +237,7 @@ func TestClient_ReplicateObject(t *testing.T) {
go func() {
defer wg.Done()

err := cli.ReplicateObject(ctx, id, demuxObj, signer)
_, err := cli.ReplicateObject(ctx, id, demuxObj, signer, false)
fmt.Println(err)
require.NoError(t, err)
}()
Expand Down
10 changes: 5 additions & 5 deletions eacl/test/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func baseBenchmarkTableBinaryComparison(b *testing.B, factor int) {
b.StopTimer()
b.ResetTimer()
b.StartTimer()
for i := 0; i < b.N; i++ {
for range b.N {
if !bytes.Equal(exp, t.Marshal()) {
b.Fail()
}
Expand All @@ -33,7 +33,7 @@ func baseBenchmarkTableEqualsComparison(b *testing.B, factor int) {
b.StopTimer()
b.ResetTimer()
b.StartTimer()
for i := 0; i < b.N; i++ {
for range b.N {
if !eacl.EqualTables(*t, t2) {
b.Fail()
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func TargetN(n int) *eacl.Target {
x.SetRole(eacl.RoleSystem)
keys := make([][]byte, n)

for i := 0; i < n; i++ {
for i := range n {
keys[i] = make([]byte, 32)
//nolint:staticcheck
rand.Read(keys[i])
Expand All @@ -85,7 +85,7 @@ func TargetN(n int) *eacl.Target {
// Record returns random eacl.Record.
func RecordN(n int) *eacl.Record {
fs := make([]eacl.Filter, n)
for i := 0; i < n; i++ {
for i := range n {
fs[i] = eacl.ConstructFilter(eacl.HeaderFromObject, "", eacl.MatchStringEqual, cidtest.ID().EncodeToString())
}

Expand All @@ -95,7 +95,7 @@ func RecordN(n int) *eacl.Record {

func TableN(n int) *eacl.Table {
rs := make([]eacl.Record, n)
for i := 0; i < n; i++ {
for i := range n {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related.

rs[i] = *RecordN(n)
}
x := eacl.NewTableForContainer(cidtest.ID(), rs)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/mr-tron/base58 v1.2.0
github.com/nspcc-dev/hrw/v2 v2.0.2
github.com/nspcc-dev/neo-go v0.106.3
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240305074711-35bc78d84dc4
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea
github.com/nspcc-dev/tzhash v1.8.2
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.33.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ github.com/nspcc-dev/hrw/v2 v2.0.2 h1:Vuc2Yu96MCv1YDUjErMuCt5tq+g/43/Y89u/XfyLkR
github.com/nspcc-dev/hrw/v2 v2.0.2/go.mod h1:XRsG20axGJfr0Ytcau/UcZ/9NF54RmUIqmoYKuuliSo=
github.com/nspcc-dev/neo-go v0.106.3 h1:HEyhgkjQY+HfBzotMJ12xx2VuOUphkngZ4kEkjvXDtE=
github.com/nspcc-dev/neo-go v0.106.3/go.mod h1:3vEwJ2ld12N7HRGCaH/l/7EwopplC/+8XdIdPDNmD/M=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240305074711-35bc78d84dc4 h1:arN0Ypn+jawZpu1BND7TGRn44InAVIqKygndsx0y2no=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240305074711-35bc78d84dc4/go.mod h1:7Tm1NKEoUVVIUlkVwFrPh7GG5+Lmta2m7EGr4oVpBd8=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea h1:mK0EMGLvunXcFyq7fBURS/CsN4MH+4nlYiqn6pTwWAU=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea/go.mod h1:YzhD4EZmC9Z/PNyd7ysC7WXgIgURc9uCG1UWDeV027Y=
github.com/nspcc-dev/rfc6979 v0.2.1 h1:8wWxkamHWFmO790GsewSoKUSJjVnL1fmdRpokU/RgRM=
github.com/nspcc-dev/rfc6979 v0.2.1/go.mod h1:Tk7h5kyUWkhjyO3zUgFFhy1v2vQv3BvQEntakdtqrWc=
github.com/nspcc-dev/tzhash v1.8.2 h1:ebRCbPoEuoqrhC6sSZmrT/jI3h1SzCWakxxV6gp5QAg=
Expand Down
2 changes: 1 addition & 1 deletion pool/pool_aio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func testPoolInterfaceWithAIO(t *testing.T, nodeAddr string) {
})

times := int(opts.sessionExpirationDuration * 3)
for i := 0; i < times; i++ {
for range times {
epoch, err := tickNewEpoch(ctx, pool)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestOneOfTwoFailed(t *testing.T) {

time.Sleep(2 * time.Second)

for i := 0; i < 5; i++ {
for range 5 {
cp, err := pool.connection()
require.NoError(t, err)
require.True(t, assertAuthKeyForAny(cp.address(), nodes))
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestStatusMonitor(t *testing.T) {
monitor.errorThreshold = 3

count := 10
for i := 0; i < count; i++ {
for range count {
monitor.incErrorRate()
}

Expand Down
2 changes: 1 addition & 1 deletion pool/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestSamplerStability(t *testing.T) {
for _, tc := range cases {
sampler := newSampler(tc.probabilities, rand.NewSource(0))
res := make([]int, len(tc.probabilities))
for i := 0; i < COUNT; i++ {
for range COUNT {
res[sampler.Next()]++
}

Expand Down
3 changes: 2 additions & 1 deletion proto/acl/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading