-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/replicate with signature #622
Changes from all commits
c965ec2
73bfe96
3e7f660
322933b
b322ea8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
"sync" | ||
|
||
objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" | ||
"github.com/nspcc-dev/neofs-api-go/v2/refs" | ||
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client" | ||
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common" | ||
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" | ||
|
@@ -32,6 +33,10 @@ | |
// ReplicateObject is intended for maintaining data storage by NeoFS system | ||
// nodes only, not for regular use. | ||
// | ||
// If signedReplication, client requests server to sign replicated object | ||
// information to ensure replication was successful. Signature is returned | ||
// (nil if not requested). | ||
// | ||
// Object must be encoded in compliance with Protocol Buffers v3 format in | ||
// ascending order of fields. | ||
// | ||
|
@@ -48,38 +53,38 @@ | |
// replicated object; | ||
// - [apistatus.ErrContainerNotFound]: the container to which the replicated | ||
// object is associated was not found. | ||
func (c *Client) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error { | ||
func (c *Client) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, signedReplication bool) (*neofscrypto.Signature, error) { | ||
const svcName = "neo.fs.v2.object.ObjectService" | ||
const opName = "Replicate" | ||
stream, err := c.c.Init(common.CallMethodInfoUnary(svcName, opName), | ||
client.WithContext(ctx), client.AllowBinarySendingOnly()) | ||
if err != nil { | ||
return fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err) | ||
return nil, fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err) | ||
} | ||
|
||
msg, err := prepareReplicateMessage(id, src, signer) | ||
msg, err := prepareReplicateMessage(id, src, signer, signedReplication) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
|
||
err = stream.WriteMessage(client.BinaryMessage(msg)) | ||
if err != nil && !errors.Is(err, io.EOF) { // io.EOF means the server closed the stream on its side | ||
return fmt.Errorf("send request: %w", err) | ||
return nil, fmt.Errorf("send request: %w", err) | ||
} | ||
|
||
var resp replicateResponse | ||
resp := replicateResponse{_sigRequested: signedReplication} | ||
err = stream.ReadMessage(&resp) | ||
if err != nil { | ||
if errors.Is(err, io.EOF) { | ||
err = io.ErrUnexpectedEOF | ||
} | ||
|
||
return fmt.Errorf("recv response: %w", err) | ||
return nil, fmt.Errorf("recv response: %w", err) | ||
} | ||
|
||
_ = stream.Close() | ||
|
||
return resp.err | ||
return resp.objSig, resp.err | ||
} | ||
|
||
// DemuxReplicatedObject allows to share same argument between multiple | ||
|
@@ -108,23 +113,23 @@ | |
return x.rs.Seek(offset, whence) | ||
} | ||
|
||
func prepareReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) { | ||
func prepareReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, signedReplication bool) ([]byte, error) { | ||
srm, ok := src.(*demuxReplicationMessage) | ||
if !ok { | ||
return newReplicateMessage(id, src, signer) | ||
return newReplicateMessage(id, src, signer, signedReplication) | ||
} | ||
|
||
srm.mtx.Lock() | ||
defer srm.mtx.Unlock() | ||
|
||
if srm.msg == nil && srm.err == nil { | ||
srm.msg, srm.err = newReplicateMessage(id, src, signer) | ||
srm.msg, srm.err = newReplicateMessage(id, src, signer, signedReplication) | ||
} | ||
|
||
return srm.msg, srm.err | ||
} | ||
|
||
func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) { | ||
func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, requireObjectSignature bool) ([]byte, error) { | ||
var objSize uint64 | ||
switch v := src.(type) { | ||
default: | ||
|
@@ -169,13 +174,15 @@ | |
|
||
const fieldNumObject = 1 | ||
const fieldNumSignature = 2 | ||
const fieldNumSignObjectFlag = 3 | ||
|
||
sigSize := protowire.SizeTag(fieldNumSigPubKey) + protowire.SizeBytes(len(bPubKey)) + | ||
protowire.SizeTag(fieldNumSigVal) + protowire.SizeBytes(len(idSig)) + | ||
protowire.SizeTag(fieldNumSigScheme) + protowire.SizeVarint(sigScheme) | ||
|
||
msgSize := protowire.SizeTag(fieldNumObject) + protowire.SizeVarint(objSize) + | ||
protowire.SizeTag(fieldNumSignature) + protowire.SizeBytes(sigSize) | ||
protowire.SizeTag(fieldNumSignature) + protowire.SizeBytes(sigSize) + | ||
protowire.SizeTag(fieldNumSignObjectFlag) + protowire.SizeVarint(protowire.EncodeBool(requireObjectSignature)) | ||
|
||
// TODO(#544): support external buffers | ||
msg := make([]byte, 0, uint64(msgSize)+objSize) | ||
|
@@ -198,12 +205,17 @@ | |
msg = protowire.AppendBytes(msg, idSig) | ||
msg = protowire.AppendTag(msg, fieldNumSigScheme, protowire.VarintType) | ||
msg = protowire.AppendVarint(msg, sigScheme) | ||
msg = protowire.AppendTag(msg, fieldNumSignObjectFlag, protowire.VarintType) | ||
msg = protowire.AppendVarint(msg, protowire.EncodeBool(requireObjectSignature)) | ||
|
||
return msg, nil | ||
} | ||
|
||
type replicateResponse struct { | ||
err error | ||
_sigRequested bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. C-style? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks weird. And it's not a response field, really. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the only possibility of understanding a signature is missing but was requested in the current code. we also may just do nothing if it is missing and return nil, it is just an RPC wrapper without any logic then, maybe that is good too, not sure
yes, that is why it is underscored...
... which is available by the language and not prohibited by any of our linters |
||
|
||
objSig *neofscrypto.Signature | ||
err error | ||
} | ||
|
||
func (x replicateResponse) ToGRPCMessage() grpc.Message { | ||
|
@@ -226,6 +238,30 @@ | |
} | ||
|
||
x.err = apistatus.ErrorFromV2(st) | ||
if x.err != nil { | ||
return nil | ||
} | ||
|
||
if !x._sigRequested { | ||
return nil | ||
} | ||
|
||
sig := m.GetObjectSignature() | ||
if sig == nil { | ||
return errors.New("requested but missing signature") | ||
} | ||
|
||
sigV2 := new(refs.Signature) | ||
err := sigV2.Unmarshal(sig) | ||
if err != nil { | ||
return fmt.Errorf("decoding signature from proto message: %w", err) | ||
} | ||
|
||
x.objSig = new(neofscrypto.Signature) | ||
err = x.objSig.ReadFromV2(*sigV2) | ||
if err != nil { | ||
return fmt.Errorf("invalid signature: %w", err) | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ func baseBenchmarkTableBinaryComparison(b *testing.B, factor int) { | |
b.StopTimer() | ||
b.ResetTimer() | ||
b.StartTimer() | ||
for i := 0; i < b.N; i++ { | ||
for range b.N { | ||
if !bytes.Equal(exp, t.Marshal()) { | ||
b.Fail() | ||
} | ||
|
@@ -33,7 +33,7 @@ func baseBenchmarkTableEqualsComparison(b *testing.B, factor int) { | |
b.StopTimer() | ||
b.ResetTimer() | ||
b.StartTimer() | ||
for i := 0; i < b.N; i++ { | ||
for range b.N { | ||
if !eacl.EqualTables(*t, t2) { | ||
b.Fail() | ||
} | ||
|
@@ -71,7 +71,7 @@ func TargetN(n int) *eacl.Target { | |
x.SetRole(eacl.RoleSystem) | ||
keys := make([][]byte, n) | ||
|
||
for i := 0; i < n; i++ { | ||
for i := range n { | ||
keys[i] = make([]byte, 32) | ||
//nolint:staticcheck | ||
rand.Read(keys[i]) | ||
|
@@ -85,7 +85,7 @@ func TargetN(n int) *eacl.Target { | |
// Record returns random eacl.Record. | ||
func RecordN(n int) *eacl.Record { | ||
fs := make([]eacl.Filter, n) | ||
for i := 0; i < n; i++ { | ||
for i := range n { | ||
fs[i] = eacl.ConstructFilter(eacl.HeaderFromObject, "", eacl.MatchStringEqual, cidtest.ID().EncodeToString()) | ||
} | ||
|
||
|
@@ -95,7 +95,7 @@ func RecordN(n int) *eacl.Record { | |
|
||
func TableN(n int) *eacl.Table { | ||
rs := make([]eacl.Record, n) | ||
for i := 0; i < n; i++ { | ||
for i := range n { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really related. |
||
rs[i] = *RecordN(n) | ||
} | ||
x := eacl.NewTableForContainer(cidtest.ID(), rs) | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in which cases
signedReplication
gonna be false in practice?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-initial replication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also as i understand, it will stiil be possible to skip this for some sort of container after nspcc-dev/neofs-api#300, no @roman-khimov?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, containers with different consistency policies can have different requirements wrt this as well.