Skip to content

Commit

Permalink
node/object/put: exchange meta signatures during replication
Browse files Browse the repository at this point in the history
Initial replication requires nodes to sign object's main meta information and
respond with it. Meta information is not sent on wire and treated as a fixed
ordered NEO's map. Signatures are verified, not stored/send anywhere yet.
It follows the recent API extension: nspcc-dev/neofs-api#299.
Further, this extension is planned to have a contract adoption:
nspcc-dev/neofs-contract#413 and nspcc-dev/neofs-contract#414.
Closes #2876.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Sep 10, 2024
1 parent 6a8c835 commit b724d14
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 45 deletions.
3 changes: 2 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
Expand Down Expand Up @@ -350,7 +351,7 @@ func initObjectService(c *cfg) {
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}

server := objectTransportGRPC.New(firstSvc, objNode)
server := objectTransportGRPC.New(firstSvc, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey))

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down
36 changes: 31 additions & 5 deletions cmd/neofs-node/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"

objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/message"
"github.com/nspcc-dev/neofs-api-go/v2/status"
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
)

type transport struct {
Expand All @@ -20,27 +22,31 @@ type transport struct {

// SendReplicationRequestToNode connects to described node and sends prepared
// replication request message to it.
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) error {
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) (*neofscrypto.Signature, error) {
c, err := x.clients.Get(node)
if err != nil {
return fmt.Errorf("connect to remote node: %w", err)
return nil, fmt.Errorf("connect to remote node: %w", err)
}

return c.ExecRaw(func(c *rawclient.Client) error {
var resp replicateResponse
err = c.ExecRaw(func(c *rawclient.Client) error {
// this will be changed during NeoFS API Go deprecation. Code most likely be
// placed in SDK
m := common.CallMethodInfo{Service: "neo.fs.v2.object.ObjectService", Name: "Replicate"}
var resp replicateResponse
err = rawclient.SendUnary(c, m, rawclient.BinaryMessage(req), &resp,
rawclient.WithContext(ctx), rawclient.AllowBinarySendingOnly())
if err != nil {
return fmt.Errorf("API transport (service=%s,op=%s): %w", m.Service, m.Name, err)
}
return resp.err
})
return resp.sig, err
}

type replicateResponse struct{ err error }
type replicateResponse struct {
sig *neofscrypto.Signature
err error
}

func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) }

Expand All @@ -60,6 +66,26 @@ func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error {
}

x.err = apistatus.ErrorFromV2(st)
if x.err != nil {
return nil
}

sig := m.GetObjectSignature()
if sig == nil {
return nil
}

sigV2 := new(refs.Signature)
err := sigV2.Unmarshal(sig)
if err != nil {
return fmt.Errorf("decoding signature from proto message: %w", err)
}

x.sig = new(neofscrypto.Signature)
err = x.sig.ReadFromV2(*sigV2)
if err != nil {
return fmt.Errorf("invalid signature: %w", err)
}

return nil
}
73 changes: 73 additions & 0 deletions pkg/core/object/replicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package object

import (
"fmt"

"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

const (
validInterval = 10 // in epochs
currentVersion = 6 // it is also a number of fields
)

const (
cidKey = "cid"
oidKey = "oid"
sizeKey = "size"
deletedKey = "deleted"
lockedKey = "locked"
validUntilKey = "validuntil"
)

// EncodeReplicationMetaInfo uses NEO's map (strict order) serialized format as a raw
// representation of object's meta information.
//
// This (ordered) format is used (keys are strings):
//
// "cid": _raw_ container ID (32 bytes)
// "oid": _raw_ object ID (32 bytes)
// "size": payload size
// "deleted": array of _raw_ object IDs
// "locked": array of _raw_ object IDs
// "validuntil": last valid epoch number for meta information
//
// Last valid epoch is object's creation epoch + 10.
func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64, deleted, locked []oid.ID, createdAt uint64) []byte {
kvs := []stackitem.MapElement{
kv(cidKey, cID[:]),
kv(oidKey, oID[:]),
kv(sizeKey, pSize),
oidsKV(deletedKey, deleted),
oidsKV(lockedKey, locked),
kv(validUntilKey, createdAt+validInterval),
}

result, err := stackitem.Serialize(stackitem.NewMapWithValue(kvs))
if err != nil {
// all the errors in the stackitem relate only cases when it is
// impossible to use serialized values (too many values, unsupported
// types, etc.), unexpected errors at all
panic(fmt.Errorf("unexpected stackitem map serialization failure: %v", err))
}

return result
}

func kv(k string, value any) stackitem.MapElement {
return stackitem.MapElement{
Key: stackitem.Make(k),
Value: stackitem.Make(value),
}
}

func oidsKV(fieldKey string, oIDs []oid.ID) stackitem.MapElement {
res := make([]stackitem.Item, 0, len(oIDs))
for _, oID := range oIDs {
res = append(res, stackitem.NewByteArray(oID[:]))
}

return kv(fieldKey, res)
}
65 changes: 65 additions & 0 deletions pkg/core/object/replicate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package object

import (
"math/big"
"math/rand/v2"
"testing"

"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)

func TestMetaInfo(t *testing.T) {
oID := oidtest.ID()
cID := cidtest.ID()
size := rand.Uint64()
deleted := oidtest.IDs(10)
locked := oidtest.IDs(10)
validUntil := rand.Uint64()

raw := EncodeReplicationMetaInfo(cID, oID, size, deleted, locked, validUntil)
item, err := stackitem.Deserialize(raw)
require.NoError(t, err)

require.Equal(t, stackitem.MapT, item.Type())
mm, ok := item.Value().([]stackitem.MapElement)
require.True(t, ok)

require.Len(t, mm, currentVersion)

require.Equal(t, cidKey, string(mm[0].Key.Value().([]byte)))
require.Equal(t, cID[:], mm[0].Value.Value().([]byte))

require.Equal(t, oidKey, string(mm[1].Key.Value().([]byte)))
require.Equal(t, oID[:], mm[1].Value.Value().([]byte))

require.Equal(t, sizeKey, string(mm[2].Key.Value().([]byte)))
require.Equal(t, size, mm[2].Value.Value().(*big.Int).Uint64())

require.Equal(t, deletedKey, string(mm[3].Key.Value().([]byte)))
require.Equal(t, deleted, stackItemToOIDs(t, mm[3].Value))

require.Equal(t, lockedKey, string(mm[4].Key.Value().([]byte)))
require.Equal(t, locked, stackItemToOIDs(t, mm[4].Value))

require.Equal(t, validUntilKey, string(mm[5].Key.Value().([]byte)))
require.Equal(t, validUntil+validInterval, mm[5].Value.Value().(*big.Int).Uint64())
}

func stackItemToOIDs(t *testing.T, value stackitem.Item) []oid.ID {
value, ok := value.(*stackitem.Array)
require.True(t, ok)

vv := value.Value().([]stackitem.Item)
res := make([]oid.ID, 0, len(vv))

for _, v := range vv {
raw := v.Value().([]byte)
res = append(res, oid.ID(raw))
}

return res
}
53 changes: 52 additions & 1 deletion pkg/network/transport/object/grpc/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
refsv2 "github.com/nspcc-dev/neofs-api-go/v2/refs"
refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc"
status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// Replicate serves neo.fs.v2.object.ObjectService/Replicate RPC.
Expand Down Expand Up @@ -178,7 +180,18 @@ func (s *Server) Replicate(_ context.Context, req *objectGRPC.ReplicateRequest)
}}, nil
}

return new(objectGRPC.ReplicateResponse), nil
resp := new(objectGRPC.ReplicateResponse)
if req.GetSignObject() {
resp.ObjectSignature, err = s.metaInfoSignature(*obj)
if err != nil {
return &objectGRPC.ReplicateResponse{Status: &status.Status{
Code: codeInternal,
Message: fmt.Sprintf("failed to sign object meta information: %v", err),
}}, nil
}
}

return resp, nil
}

func objectFromMessage(gMsg *objectGRPC.Object) (*object.Object, error) {
Expand All @@ -190,3 +203,41 @@ func objectFromMessage(gMsg *objectGRPC.Object) (*object.Object, error) {

return object.NewFromV2(&msg), nil
}

func (s *Server) metaInfoSignature(o object.Object) ([]byte, error) {
var deleted []oid.ID
var locked []oid.ID
switch o.Type() {
case object.TypeTombstone:
var t object.Tombstone
err := t.Unmarshal(o.Payload())
if err != nil {
return nil, fmt.Errorf("reading tombstoned objects: %w", err)
}

deleted = t.Members()
case object.TypeLock:
var l object.Lock
err := l.Unmarshal(o.Payload())
if err != nil {
return nil, fmt.Errorf("reading locked objects: %w", err)
}

locked = make([]oid.ID, l.NumberOfMembers())
l.ReadMembers(locked)
default:
}

metaInfo := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, o.CreationEpoch())

var sig neofscrypto.Signature
err := sig.Calculate(s.signer, metaInfo)
if err != nil {
return nil, fmt.Errorf("signature failure: %w", err)
}

sigV2 := new(refsv2.Signature)
sig.WriteToV2(sigV2)

return sigV2.StableMarshal(nil), nil
}
Loading

0 comments on commit b724d14

Please sign in to comment.