From 5bcb5e0b0884e05e2aebf19fe7f349dd98a72ce0 Mon Sep 17 00:00:00 2001 From: Manish Sethi Date: Wed, 23 Sep 2020 13:15:37 -0400 Subject: [PATCH] Implement interface SnapshotPvtdataHashesConsumer for pvtdata store (#1908) Signed-off-by: manish --- core/chaincode/implicitcollection/name.go | 4 + .../chaincode/implicitcollection/name_test.go | 6 + core/common/privdata/membershipinfo.go | 13 +- core/common/privdata/membershipinfo_test.go | 9 + .../confighistorytest/confighistory.go | 86 ++ .../confighistorytest/confighistory_test.go | 107 ++ core/ledger/ledger_interface.go | 2 + core/ledger/mock/membership_info_provider.go | 64 + core/ledger/pvtdatapolicy/btlpolicy.go | 6 +- core/ledger/pvtdatastorage/helper.go | 23 +- core/ledger/pvtdatastorage/kv_encoding.go | 28 +- .../pvtdatastorage/persistent_msgs.pb.go | 232 +++- .../pvtdatastorage/persistent_msgs.proto | 19 +- .../pvtdatastorage/persistent_msgs_helper.go | 50 +- .../pvtdatastorage/snapshot_data_importer.go | 329 ++++++ .../snapshot_data_importer_test.go | 1034 +++++++++++++++++ core/ledger/pvtdatastorage/store.go | 13 +- 17 files changed, 1930 insertions(+), 95 deletions(-) create mode 100644 core/ledger/confighistory/confighistorytest/confighistory.go create mode 100644 core/ledger/confighistory/confighistorytest/confighistory_test.go create mode 100644 core/ledger/pvtdatastorage/snapshot_data_importer.go create mode 100644 core/ledger/pvtdatastorage/snapshot_data_importer_test.go diff --git a/core/chaincode/implicitcollection/name.go b/core/chaincode/implicitcollection/name.go index 8c7c4711c93..d6f3d26c05e 100644 --- a/core/chaincode/implicitcollection/name.go +++ b/core/chaincode/implicitcollection/name.go @@ -27,3 +27,7 @@ func MspIDIfImplicitCollection(collectionName string) (isImplicitCollection bool } return true, collectionName[len(prefix):] } + +func IsImplicitCollection(collectionName string) bool { + return strings.HasPrefix(collectionName, prefix) +} diff --git a/core/chaincode/implicitcollection/name_test.go b/core/chaincode/implicitcollection/name_test.go index 3264c32febb..01ba92a6255 100644 --- a/core/chaincode/implicitcollection/name_test.go +++ b/core/chaincode/implicitcollection/name_test.go @@ -53,3 +53,9 @@ func TestMspIDIfImplicitCollection(t *testing.T) { }) } } + +func TestIsImplicitCollection(t *testing.T) { + require.True(t, IsImplicitCollection("_implicit_org_")) + require.True(t, IsImplicitCollection("_implicit_org_MyOrg")) + require.False(t, IsImplicitCollection("implicit_org_MyOrg")) +} diff --git a/core/common/privdata/membershipinfo.go b/core/common/privdata/membershipinfo.go index 6f5aa294ae2..33958c15e9e 100644 --- a/core/common/privdata/membershipinfo.go +++ b/core/common/privdata/membershipinfo.go @@ -9,6 +9,7 @@ package privdata import ( "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/core/chaincode/implicitcollection" "github.com/hyperledger/fabric/msp" "github.com/hyperledger/fabric/protoutil" ) @@ -20,11 +21,17 @@ type MembershipProvider struct { mspID string selfSignedData protoutil.SignedData IdentityDeserializerFactory func(chainID string) msp.IdentityDeserializer + myImplicitCollectionName string } // NewMembershipInfoProvider returns MembershipProvider func NewMembershipInfoProvider(mspID string, selfSignedData protoutil.SignedData, identityDeserializerFunc func(chainID string) msp.IdentityDeserializer) *MembershipProvider { - return &MembershipProvider{mspID: mspID, selfSignedData: selfSignedData, IdentityDeserializerFactory: identityDeserializerFunc} + return &MembershipProvider{ + mspID: mspID, + selfSignedData: selfSignedData, + IdentityDeserializerFactory: identityDeserializerFunc, + myImplicitCollectionName: implicitcollection.NameForOrg(mspID), + } } // AmMemberOf checks whether the current peer is a member of the given collection config. @@ -56,3 +63,7 @@ func (m *MembershipProvider) AmMemberOf(channelName string, collectionPolicyConf return true, nil } + +func (m *MembershipProvider) MyImplicitCollectionName() string { + return m.myImplicitCollectionName +} diff --git a/core/common/privdata/membershipinfo_test.go b/core/common/privdata/membershipinfo_test.go index 673d22bbe4a..b14c82d8bac 100644 --- a/core/common/privdata/membershipinfo_test.go +++ b/core/common/privdata/membershipinfo_test.go @@ -11,6 +11,7 @@ import ( "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/policydsl" + "github.com/hyperledger/fabric/core/chaincode/implicitcollection" "github.com/hyperledger/fabric/msp" "github.com/hyperledger/fabric/protoutil" "github.com/stretchr/testify/require" @@ -63,6 +64,14 @@ func TestMembershipInfoProvider(t *testing.T) { require.Nil(t, err) } +func TestMyImplicitCollectionName(t *testing.T) { + require.Equal( + t, + implicitcollection.NameForOrg("my_org"), + NewMembershipInfoProvider("my_org", protoutil.SignedData{}, nil).MyImplicitCollectionName(), + ) +} + func getAccessPolicy(signers []string) *peer.CollectionPolicyConfig { var data [][]byte for _, signer := range signers { diff --git a/core/ledger/confighistory/confighistorytest/confighistory.go b/core/ledger/confighistory/confighistorytest/confighistory.go new file mode 100644 index 00000000000..89f41b6e3bc --- /dev/null +++ b/core/ledger/confighistory/confighistorytest/confighistory.go @@ -0,0 +1,86 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package confighistorytest + +import ( + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/confighistory" + "github.com/hyperledger/fabric/core/ledger/mock" +) + +type Mgr struct { + *confighistory.Mgr + mockCCInfoProvider *mock.DeployedChaincodeInfoProvider +} + +func NewMgr(dbPath string) (*Mgr, error) { + mockCCInfoProvider := &mock.DeployedChaincodeInfoProvider{} + configHistory, err := confighistory.NewMgr(dbPath, mockCCInfoProvider) + if err != nil { + return nil, err + } + return &Mgr{ + Mgr: configHistory, + mockCCInfoProvider: mockCCInfoProvider, + }, nil +} + +func (m *Mgr) Setup(ledgerID, namespace string, configHistory map[uint64][]*peer.StaticCollectionConfig) error { + for committingBlk, config := range configHistory { + m.mockCCInfoProvider.UpdatedChaincodesReturns( + []*ledger.ChaincodeLifecycleInfo{ + { + Name: namespace, + }, + }, nil, + ) + + m.mockCCInfoProvider.ChaincodeInfoReturns( + &ledger.DeployedChaincodeInfo{ + Name: namespace, + ExplicitCollectionConfigPkg: BuildCollConfigPkg(config), + }, + nil, + ) + + err := m.HandleStateUpdates( + &ledger.StateUpdateTrigger{ + LedgerID: ledgerID, + CommittingBlockNum: committingBlk, + }, + ) + defer m.StateCommitDone(ledgerID) + if err != nil { + return err + } + } + return nil +} + +func (m *Mgr) Close() { + m.Mgr.Close() +} + +func BuildCollConfigPkg(staticCollectionConfigs []*peer.StaticCollectionConfig) *peer.CollectionConfigPackage { + if len(staticCollectionConfigs) == 0 { + return nil + } + pkg := &peer.CollectionConfigPackage{ + Config: []*peer.CollectionConfig{}, + } + for _, c := range staticCollectionConfigs { + pkg.Config = append(pkg.Config, + &peer.CollectionConfig{ + Payload: &peer.CollectionConfig_StaticCollectionConfig{ + StaticCollectionConfig: c, + }, + }, + ) + } + return pkg +} diff --git a/core/ledger/confighistory/confighistorytest/confighistory_test.go b/core/ledger/confighistory/confighistorytest/confighistory_test.go new file mode 100644 index 00000000000..0aff5216040 --- /dev/null +++ b/core/ledger/confighistory/confighistorytest/confighistory_test.go @@ -0,0 +1,107 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package confighistorytest + +import ( + "fmt" + "io/ioutil" + "math" + "os" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/stretchr/testify/require" +) + +func TestConfigHistory(t *testing.T) { + testDir, err := ioutil.TempDir("", "confighitory-") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + mgr, err := NewMgr(testDir) + require.NoError(t, err) + defer mgr.Close() + + sampleConfigHistoryNS1 := map[uint64][]*peer.StaticCollectionConfig{ + 300: {{Name: "coll1"}, {Name: "coll2"}, {Name: "coll3"}}, + 20: {{Name: "coll1"}, {Name: "coll2"}}, + } + + sampleConfigHistoryNS2 := map[uint64][]*peer.StaticCollectionConfig{ + 400: {{Name: "coll4"}}, + } + + require.NoError(t, mgr.Setup("ledger-1", "ns1", sampleConfigHistoryNS1)) + require.NoError(t, mgr.Setup("ledger-1", "ns2", sampleConfigHistoryNS2)) + + r := mgr.GetRetriever("ledger-1") + + testcases := []struct { + inputNS string + inputBlkNum uint64 + + outputConfig *peer.CollectionConfigPackage + outputBlkNum uint64 + }{ + { + inputNS: "ns1", + inputBlkNum: math.MaxUint64, + outputConfig: BuildCollConfigPkg(sampleConfigHistoryNS1[300]), + outputBlkNum: 300, + }, + + { + inputNS: "ns1", + inputBlkNum: 300, + outputConfig: BuildCollConfigPkg(sampleConfigHistoryNS1[20]), + outputBlkNum: 20, + }, + + { + inputNS: "ns1", + inputBlkNum: 20, + outputConfig: nil, + outputBlkNum: 0, + }, + + { + inputNS: "ns2", + inputBlkNum: math.MaxUint64, + outputConfig: BuildCollConfigPkg(sampleConfigHistoryNS2[400]), + outputBlkNum: 400, + }, + + { + inputNS: "ns2", + inputBlkNum: 200, + outputConfig: nil, + outputBlkNum: 0, + }, + } + + for i, c := range testcases { + t.Run(fmt.Sprintf("testcase-%d", i), func(t *testing.T) { + collectionConfigInfo, err := r.MostRecentCollectionConfigBelow(c.inputBlkNum, c.inputNS) + require.NoError(t, err) + + if c.outputConfig == nil { + require.Nil(t, c.outputConfig) + require.Equal(t, uint64(0), c.outputBlkNum) + return + } + + require.Equal(t, c.outputBlkNum, collectionConfigInfo.CommittingBlockNum) + require.True(t, + proto.Equal( + collectionConfigInfo.CollectionConfig, + c.outputConfig, + ), + ) + }) + } +} diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index 3b0bc862821..8861827e98a 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -681,6 +681,8 @@ type ChaincodeLifecycleDetails struct { type MembershipInfoProvider interface { // AmMemberOf checks whether the current peer is a member of the given collection AmMemberOf(channelName string, collectionPolicyConfig *peer.CollectionPolicyConfig) (bool, error) + // MyImplicitCollectionName returns the name of the implicit collection for the current peer + MyImplicitCollectionName() string } type HealthCheckRegistry interface { diff --git a/core/ledger/mock/membership_info_provider.go b/core/ledger/mock/membership_info_provider.go index 609cf6bb257..b53c07fa898 100644 --- a/core/ledger/mock/membership_info_provider.go +++ b/core/ledger/mock/membership_info_provider.go @@ -23,6 +23,16 @@ type MembershipInfoProvider struct { result1 bool result2 error } + MyImplicitCollectionNameStub func() string + myImplicitCollectionNameMutex sync.RWMutex + myImplicitCollectionNameArgsForCall []struct { + } + myImplicitCollectionNameReturns struct { + result1 string + } + myImplicitCollectionNameReturnsOnCall map[int]struct { + result1 string + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -91,11 +101,65 @@ func (fake *MembershipInfoProvider) AmMemberOfReturnsOnCall(i int, result1 bool, }{result1, result2} } +func (fake *MembershipInfoProvider) MyImplicitCollectionName() string { + fake.myImplicitCollectionNameMutex.Lock() + ret, specificReturn := fake.myImplicitCollectionNameReturnsOnCall[len(fake.myImplicitCollectionNameArgsForCall)] + fake.myImplicitCollectionNameArgsForCall = append(fake.myImplicitCollectionNameArgsForCall, struct { + }{}) + fake.recordInvocation("MyImplicitCollectionName", []interface{}{}) + fake.myImplicitCollectionNameMutex.Unlock() + if fake.MyImplicitCollectionNameStub != nil { + return fake.MyImplicitCollectionNameStub() + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.myImplicitCollectionNameReturns + return fakeReturns.result1 +} + +func (fake *MembershipInfoProvider) MyImplicitCollectionNameCallCount() int { + fake.myImplicitCollectionNameMutex.RLock() + defer fake.myImplicitCollectionNameMutex.RUnlock() + return len(fake.myImplicitCollectionNameArgsForCall) +} + +func (fake *MembershipInfoProvider) MyImplicitCollectionNameCalls(stub func() string) { + fake.myImplicitCollectionNameMutex.Lock() + defer fake.myImplicitCollectionNameMutex.Unlock() + fake.MyImplicitCollectionNameStub = stub +} + +func (fake *MembershipInfoProvider) MyImplicitCollectionNameReturns(result1 string) { + fake.myImplicitCollectionNameMutex.Lock() + defer fake.myImplicitCollectionNameMutex.Unlock() + fake.MyImplicitCollectionNameStub = nil + fake.myImplicitCollectionNameReturns = struct { + result1 string + }{result1} +} + +func (fake *MembershipInfoProvider) MyImplicitCollectionNameReturnsOnCall(i int, result1 string) { + fake.myImplicitCollectionNameMutex.Lock() + defer fake.myImplicitCollectionNameMutex.Unlock() + fake.MyImplicitCollectionNameStub = nil + if fake.myImplicitCollectionNameReturnsOnCall == nil { + fake.myImplicitCollectionNameReturnsOnCall = make(map[int]struct { + result1 string + }) + } + fake.myImplicitCollectionNameReturnsOnCall[i] = struct { + result1 string + }{result1} +} + func (fake *MembershipInfoProvider) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.amMemberOfMutex.RLock() defer fake.amMemberOfMutex.RUnlock() + fake.myImplicitCollectionNameMutex.RLock() + defer fake.myImplicitCollectionNameMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/core/ledger/pvtdatapolicy/btlpolicy.go b/core/ledger/pvtdatapolicy/btlpolicy.go index f3792fd994d..d08d84999d6 100644 --- a/core/ledger/pvtdatapolicy/btlpolicy.go +++ b/core/ledger/pvtdatapolicy/btlpolicy.go @@ -79,11 +79,15 @@ func (p *LSCCBasedBTLPolicy) GetExpiringBlock(namesapce string, collection strin if err != nil { return 0, err } + return ComputeExpiringBlock(namesapce, collection, committingBlock, btl), nil +} + +func ComputeExpiringBlock(namesapce, collection string, committingBlock, btl uint64) uint64 { expiryBlk := committingBlock + btl + uint64(1) if expiryBlk <= committingBlock { // committingBlk + btl overflows uint64-max expiryBlk = math.MaxUint64 } - return expiryBlk, nil + return expiryBlk } type collectionInfoProvider interface { diff --git a/core/ledger/pvtdatastorage/helper.go b/core/ledger/pvtdatastorage/helper.go index ec5ec5b430d..15366d3a85e 100644 --- a/core/ledger/pvtdatastorage/helper.go +++ b/core/ledger/pvtdatastorage/helper.go @@ -159,13 +159,13 @@ func getOrCreateExpiryData(mapByExpiringBlk map[uint64]*ExpiryData, expiringBlk return expiryData } -// deriveKeys constructs dataKeys and missingDataKey from an expiryEntry -func deriveKeys(expiryEntry *expiryEntry) ([]*dataKey, []*missingDataKey) { +func deriveKeys(expiryEntry *expiryEntry) ([]*dataKey, []*missingDataKey, []*bootKVHashesKey) { var dataKeys []*dataKey var missingDataKeys []*missingDataKey + var bootKVHashesKeys []*bootKVHashesKey for ns, colls := range expiryEntry.value.Map { - for coll, txNums := range colls.Map { + for coll, txNums := range colls.PresentData { for _, txNum := range txNums.List { dataKeys = append(dataKeys, &dataKey{ @@ -179,7 +179,7 @@ func deriveKeys(expiryEntry *expiryEntry) ([]*dataKey, []*missingDataKey) { } } - for coll := range colls.MissingDataMap { + for coll := range colls.MissingData { missingDataKeys = append(missingDataKeys, &missingDataKey{ nsCollBlk: nsCollBlk{ @@ -189,9 +189,22 @@ func deriveKeys(expiryEntry *expiryEntry) ([]*dataKey, []*missingDataKey) { }, }) } + + for coll, txNums := range colls.BootKVHashes { + for _, txNum := range txNums.List { + bootKVHashesKeys = append(bootKVHashesKeys, + &bootKVHashesKey{ + blkNum: expiryEntry.key.committingBlk, + txNum: txNum, + ns: ns, + coll: coll, + }, + ) + } + } } - return dataKeys, missingDataKeys + return dataKeys, missingDataKeys, bootKVHashesKeys } func passesFilter(dataKey *dataKey, filter ledger.PvtNsCollFilter) bool { diff --git a/core/ledger/pvtdatastorage/kv_encoding.go b/core/ledger/pvtdatastorage/kv_encoding.go index a67985248c0..cd1375589d1 100644 --- a/core/ledger/pvtdatastorage/kv_encoding.go +++ b/core/ledger/pvtdatastorage/kv_encoding.go @@ -28,6 +28,7 @@ var ( collElgKeyPrefix = []byte{6} lastUpdatedOldBlocksKey = []byte{7} elgDeprioritizedMissingDataGroup = []byte{8} + bootKVHashesKeyPrefix = []byte{9} nilByte = byte(0) emptyValue = []byte{} @@ -85,7 +86,7 @@ func decodeExpiryKey(expiryKeyBytes []byte) (*expiryKey, error) { func decodeExpiryValue(expiryValueBytes []byte) (*ExpiryData, error) { expiryData := &ExpiryData{} err := proto.Unmarshal(expiryValueBytes, expiryData) - return expiryData, err + return expiryData, errors.Wrap(err, "error while decoding expiry value") } func decodeDatakey(datakeyBytes []byte) (*dataKey, error) { @@ -164,7 +165,7 @@ func encodeMissingDataValue(bitmap *bitset.BitSet) ([]byte, error) { func decodeMissingDataValue(bitmapBytes []byte) (*bitset.BitSet, error) { bitmap := &bitset.BitSet{} if err := bitmap.UnmarshalBinary(bitmapBytes); err != nil { - return nil, err + return nil, errors.Wrap(err, "error while decoding missing data value") } return bitmap, nil } @@ -190,6 +191,29 @@ func decodeCollElgVal(b []byte) (*CollElgInfo, error) { return m, nil } +func encodeBootKVHashesKey(key *bootKVHashesKey) []byte { + k := append(bootKVHashesKeyPrefix, version.NewHeight(key.blkNum, key.txNum).ToBytes()...) + k = append(k, []byte(key.ns)...) + k = append(k, nilByte) + return append(k, []byte(key.coll)...) +} + +func encodeBootKVHashesVal(val *BootKVHashes) ([]byte, error) { + b, err := proto.Marshal(val) + if err != nil { + return nil, errors.Wrap(err, "error while marshalling BootKVHashes") + } + return b, nil +} + +func decodeBootKVHashesVal(b []byte) (*BootKVHashes, error) { + val := &BootKVHashes{} + if err := proto.Unmarshal(b, val); err != nil { + return nil, errors.Wrap(err, "error while unmarshalling bytes for BootKVHashes") + } + return val, nil +} + func createRangeScanKeysForElgMissingData(blkNum uint64, group []byte) ([]byte, []byte) { startKey := append(group, encodeReverseOrderVarUint64(blkNum)...) endKey := append(group, encodeReverseOrderVarUint64(0)...) diff --git a/core/ledger/pvtdatastorage/persistent_msgs.pb.go b/core/ledger/pvtdatastorage/persistent_msgs.pb.go index f3589ab169e..218bb7cf244 100644 --- a/core/ledger/pvtdatastorage/persistent_msgs.pb.go +++ b/core/ledger/pvtdatastorage/persistent_msgs.pb.go @@ -21,10 +21,10 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type ExpiryData struct { - Map map[string]*Collections `protobuf:"bytes,1,rep,name=map,proto3" json:"map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Map map[string]*NamespaceExpiryData `protobuf:"bytes,1,rep,name=map,proto3" json:"map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *ExpiryData) Reset() { *m = ExpiryData{} } @@ -52,60 +52,155 @@ func (m *ExpiryData) XXX_DiscardUnknown() { var xxx_messageInfo_ExpiryData proto.InternalMessageInfo -func (m *ExpiryData) GetMap() map[string]*Collections { +func (m *ExpiryData) GetMap() map[string]*NamespaceExpiryData { if m != nil { return m.Map } return nil } -type Collections struct { +type NamespaceExpiryData struct { // for pvt data, there would be an // entry in TxNums - Map map[string]*TxNums `protobuf:"bytes,1,rep,name=map,proto3" json:"map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + PresentData map[string]*TxNums `protobuf:"bytes,1,rep,name=presentData,proto3" json:"presentData,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // for any number of missing pvt data of a collection, // there would be an entry in the map - MissingDataMap map[string]bool `protobuf:"bytes,2,rep,name=missingDataMap,proto3" json:"missingDataMap,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + MissingData map[string]bool `protobuf:"bytes,2,rep,name=missingData,proto3" json:"missingData,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + //entries for hashes for the pvtdata key-values (loaded from snapshot data) + BootKVHashes map[string]*TxNums `protobuf:"bytes,3,rep,name=bootKVHashes,proto3" json:"bootKVHashes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NamespaceExpiryData) Reset() { *m = NamespaceExpiryData{} } +func (m *NamespaceExpiryData) String() string { return proto.CompactTextString(m) } +func (*NamespaceExpiryData) ProtoMessage() {} +func (*NamespaceExpiryData) Descriptor() ([]byte, []int) { + return fileDescriptor_0f0cbd2d16bac879, []int{1} } -func (m *Collections) Reset() { *m = Collections{} } -func (m *Collections) String() string { return proto.CompactTextString(m) } -func (*Collections) ProtoMessage() {} -func (*Collections) Descriptor() ([]byte, []int) { - return fileDescriptor_0f0cbd2d16bac879, []int{1} +func (m *NamespaceExpiryData) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_NamespaceExpiryData.Unmarshal(m, b) +} +func (m *NamespaceExpiryData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_NamespaceExpiryData.Marshal(b, m, deterministic) +} +func (m *NamespaceExpiryData) XXX_Merge(src proto.Message) { + xxx_messageInfo_NamespaceExpiryData.Merge(m, src) } +func (m *NamespaceExpiryData) XXX_Size() int { + return xxx_messageInfo_NamespaceExpiryData.Size(m) +} +func (m *NamespaceExpiryData) XXX_DiscardUnknown() { + xxx_messageInfo_NamespaceExpiryData.DiscardUnknown(m) +} + +var xxx_messageInfo_NamespaceExpiryData proto.InternalMessageInfo -func (m *Collections) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Collections.Unmarshal(m, b) +func (m *NamespaceExpiryData) GetPresentData() map[string]*TxNums { + if m != nil { + return m.PresentData + } + return nil } -func (m *Collections) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Collections.Marshal(b, m, deterministic) + +func (m *NamespaceExpiryData) GetMissingData() map[string]bool { + if m != nil { + return m.MissingData + } + return nil } -func (m *Collections) XXX_Merge(src proto.Message) { - xxx_messageInfo_Collections.Merge(m, src) + +func (m *NamespaceExpiryData) GetBootKVHashes() map[string]*TxNums { + if m != nil { + return m.BootKVHashes + } + return nil } -func (m *Collections) XXX_Size() int { - return xxx_messageInfo_Collections.Size(m) + +type BootKVHash struct { + KeyHash []byte `protobuf:"bytes,1,opt,name=keyHash,proto3" json:"keyHash,omitempty"` + ValueHash []byte `protobuf:"bytes,2,opt,name=valueHash,proto3" json:"valueHash,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } -func (m *Collections) XXX_DiscardUnknown() { - xxx_messageInfo_Collections.DiscardUnknown(m) + +func (m *BootKVHash) Reset() { *m = BootKVHash{} } +func (m *BootKVHash) String() string { return proto.CompactTextString(m) } +func (*BootKVHash) ProtoMessage() {} +func (*BootKVHash) Descriptor() ([]byte, []int) { + return fileDescriptor_0f0cbd2d16bac879, []int{2} } -var xxx_messageInfo_Collections proto.InternalMessageInfo +func (m *BootKVHash) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_BootKVHash.Unmarshal(m, b) +} +func (m *BootKVHash) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_BootKVHash.Marshal(b, m, deterministic) +} +func (m *BootKVHash) XXX_Merge(src proto.Message) { + xxx_messageInfo_BootKVHash.Merge(m, src) +} +func (m *BootKVHash) XXX_Size() int { + return xxx_messageInfo_BootKVHash.Size(m) +} +func (m *BootKVHash) XXX_DiscardUnknown() { + xxx_messageInfo_BootKVHash.DiscardUnknown(m) +} -func (m *Collections) GetMap() map[string]*TxNums { +var xxx_messageInfo_BootKVHash proto.InternalMessageInfo + +func (m *BootKVHash) GetKeyHash() []byte { if m != nil { - return m.Map + return m.KeyHash } return nil } -func (m *Collections) GetMissingDataMap() map[string]bool { +func (m *BootKVHash) GetValueHash() []byte { + if m != nil { + return m.ValueHash + } + return nil +} + +type BootKVHashes struct { + List []*BootKVHash `protobuf:"bytes,1,rep,name=list,proto3" json:"list,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *BootKVHashes) Reset() { *m = BootKVHashes{} } +func (m *BootKVHashes) String() string { return proto.CompactTextString(m) } +func (*BootKVHashes) ProtoMessage() {} +func (*BootKVHashes) Descriptor() ([]byte, []int) { + return fileDescriptor_0f0cbd2d16bac879, []int{3} +} + +func (m *BootKVHashes) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_BootKVHashes.Unmarshal(m, b) +} +func (m *BootKVHashes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_BootKVHashes.Marshal(b, m, deterministic) +} +func (m *BootKVHashes) XXX_Merge(src proto.Message) { + xxx_messageInfo_BootKVHashes.Merge(m, src) +} +func (m *BootKVHashes) XXX_Size() int { + return xxx_messageInfo_BootKVHashes.Size(m) +} +func (m *BootKVHashes) XXX_DiscardUnknown() { + xxx_messageInfo_BootKVHashes.DiscardUnknown(m) +} + +var xxx_messageInfo_BootKVHashes proto.InternalMessageInfo + +func (m *BootKVHashes) GetList() []*BootKVHash { if m != nil { - return m.MissingDataMap + return m.List } return nil } @@ -121,7 +216,7 @@ func (m *TxNums) Reset() { *m = TxNums{} } func (m *TxNums) String() string { return proto.CompactTextString(m) } func (*TxNums) ProtoMessage() {} func (*TxNums) Descriptor() ([]byte, []int) { - return fileDescriptor_0f0cbd2d16bac879, []int{2} + return fileDescriptor_0f0cbd2d16bac879, []int{4} } func (m *TxNums) XXX_Unmarshal(b []byte) error { @@ -160,7 +255,7 @@ func (m *CollElgInfo) Reset() { *m = CollElgInfo{} } func (m *CollElgInfo) String() string { return proto.CompactTextString(m) } func (*CollElgInfo) ProtoMessage() {} func (*CollElgInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_0f0cbd2d16bac879, []int{3} + return fileDescriptor_0f0cbd2d16bac879, []int{5} } func (m *CollElgInfo) XXX_Unmarshal(b []byte) error { @@ -199,7 +294,7 @@ func (m *CollNames) Reset() { *m = CollNames{} } func (m *CollNames) String() string { return proto.CompactTextString(m) } func (*CollNames) ProtoMessage() {} func (*CollNames) Descriptor() ([]byte, []int) { - return fileDescriptor_0f0cbd2d16bac879, []int{4} + return fileDescriptor_0f0cbd2d16bac879, []int{6} } func (m *CollNames) XXX_Unmarshal(b []byte) error { @@ -229,10 +324,13 @@ func (m *CollNames) GetEntries() []string { func init() { proto.RegisterType((*ExpiryData)(nil), "pvtdatastorage.ExpiryData") - proto.RegisterMapType((map[string]*Collections)(nil), "pvtdatastorage.ExpiryData.MapEntry") - proto.RegisterType((*Collections)(nil), "pvtdatastorage.Collections") - proto.RegisterMapType((map[string]*TxNums)(nil), "pvtdatastorage.Collections.MapEntry") - proto.RegisterMapType((map[string]bool)(nil), "pvtdatastorage.Collections.MissingDataMapEntry") + proto.RegisterMapType((map[string]*NamespaceExpiryData)(nil), "pvtdatastorage.ExpiryData.MapEntry") + proto.RegisterType((*NamespaceExpiryData)(nil), "pvtdatastorage.NamespaceExpiryData") + proto.RegisterMapType((map[string]*TxNums)(nil), "pvtdatastorage.NamespaceExpiryData.BootKVHashesEntry") + proto.RegisterMapType((map[string]bool)(nil), "pvtdatastorage.NamespaceExpiryData.MissingDataEntry") + proto.RegisterMapType((map[string]*TxNums)(nil), "pvtdatastorage.NamespaceExpiryData.PresentDataEntry") + proto.RegisterType((*BootKVHash)(nil), "pvtdatastorage.BootKVHash") + proto.RegisterType((*BootKVHashes)(nil), "pvtdatastorage.BootKVHashes") proto.RegisterType((*TxNums)(nil), "pvtdatastorage.TxNums") proto.RegisterType((*CollElgInfo)(nil), "pvtdatastorage.CollElgInfo") proto.RegisterMapType((map[string]*CollNames)(nil), "pvtdatastorage.CollElgInfo.NsCollMapEntry") @@ -242,30 +340,36 @@ func init() { func init() { proto.RegisterFile("persistent_msgs.proto", fileDescriptor_0f0cbd2d16bac879) } var fileDescriptor_0f0cbd2d16bac879 = []byte{ - // 389 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x93, 0xdf, 0x6a, 0xdb, 0x30, - 0x14, 0xc6, 0xb1, 0x93, 0x65, 0xf1, 0x09, 0x84, 0xa1, 0xfd, 0xc1, 0xf3, 0x76, 0x11, 0xb2, 0x0d, - 0xc2, 0x18, 0x36, 0xcb, 0xd8, 0x08, 0xb9, 0x5b, 0xdb, 0x40, 0x7b, 0x11, 0x5f, 0xb8, 0x85, 0x40, - 0x6f, 0x8a, 0xe2, 0x28, 0x8e, 0xa8, 0x6d, 0x09, 0x49, 0x09, 0xf1, 0x9b, 0xf4, 0x31, 0xda, 0x37, - 0x2c, 0xb6, 0xf3, 0xc7, 0x0a, 0x26, 0x77, 0xf2, 0xd1, 0x77, 0x7e, 0xe7, 0x3b, 0x1f, 0x16, 0x7c, - 0xe4, 0x44, 0x48, 0x2a, 0x15, 0x49, 0xd5, 0x43, 0x22, 0x23, 0xe9, 0x72, 0xc1, 0x14, 0x43, 0x5d, - 0xbe, 0x51, 0x0b, 0xac, 0xb0, 0x54, 0x4c, 0xe0, 0x88, 0xf4, 0x9f, 0x0c, 0x80, 0xc9, 0x96, 0x53, - 0x91, 0x5d, 0x61, 0x85, 0xd1, 0x5f, 0x68, 0x24, 0x98, 0xdb, 0x46, 0xaf, 0x31, 0xe8, 0x0c, 0xbf, - 0xb9, 0xba, 0xd8, 0x3d, 0x0a, 0xdd, 0x29, 0xe6, 0x93, 0x54, 0x89, 0x2c, 0xc8, 0xf5, 0xce, 0x2d, - 0xb4, 0xf7, 0x05, 0xf4, 0x0e, 0x1a, 0x8f, 0x24, 0xb3, 0x8d, 0x9e, 0x31, 0xb0, 0x82, 0xfc, 0x88, - 0x7e, 0xc3, 0x9b, 0x0d, 0x8e, 0xd7, 0xc4, 0x36, 0x7b, 0xc6, 0xa0, 0x33, 0xfc, 0x72, 0x8a, 0xbd, - 0x64, 0x71, 0x4c, 0x42, 0x45, 0x59, 0x2a, 0x83, 0x52, 0x39, 0x36, 0x47, 0x46, 0xff, 0xc5, 0x84, - 0x4e, 0xe5, 0x0a, 0xfd, 0xab, 0x7a, 0xfb, 0x7e, 0x06, 0xa2, 0x9b, 0x43, 0x33, 0xe8, 0x26, 0x54, - 0x4a, 0x9a, 0x46, 0xb9, 0xf3, 0x29, 0xe6, 0xb6, 0x59, 0x20, 0xbc, 0xb3, 0x08, 0xad, 0xa3, 0xa4, - 0x9d, 0x60, 0x1c, 0xff, 0xec, 0xd6, 0xbf, 0xf4, 0xad, 0x3f, 0x9d, 0x4e, 0xbb, 0xdb, 0xfa, 0xeb, - 0xa4, 0xba, 0xb0, 0xf3, 0x1f, 0xde, 0xd7, 0x8c, 0xad, 0x41, 0x7f, 0xa8, 0xa2, 0xdb, 0xd5, 0xcc, - 0xbe, 0x42, 0xab, 0xe4, 0x22, 0x04, 0xcd, 0x98, 0x4a, 0x55, 0xc4, 0xd5, 0x0c, 0x8a, 0x73, 0xff, - 0xd9, 0x28, 0x13, 0x9d, 0xc4, 0xd1, 0x4d, 0xba, 0x64, 0xe8, 0x1a, 0xac, 0x54, 0xe6, 0x85, 0xe9, - 0x21, 0xd7, 0x9f, 0x75, 0xa1, 0xec, 0xf4, 0xae, 0xbf, 0x17, 0x97, 0x79, 0x1c, 0x9b, 0x9d, 0x19, - 0x74, 0xf5, 0xcb, 0x1a, 0xd7, 0x9e, 0x1e, 0xc8, 0xe7, 0xba, 0x49, 0x3e, 0x4e, 0x88, 0xf6, 0x13, - 0xfc, 0x00, 0xeb, 0x50, 0x47, 0x36, 0xbc, 0x25, 0xa9, 0x12, 0x94, 0xc8, 0xc2, 0xad, 0x15, 0xec, - 0x3f, 0x2f, 0xc6, 0xf7, 0xa3, 0x88, 0xaa, 0xd5, 0x7a, 0xee, 0x86, 0x2c, 0xf1, 0x56, 0x19, 0x27, - 0x22, 0x26, 0x8b, 0x88, 0x08, 0x6f, 0x89, 0xe7, 0x82, 0x86, 0x5e, 0xc8, 0x04, 0xf1, 0x76, 0x25, - 0x7d, 0xee, 0xbc, 0x55, 0xbc, 0x8c, 0x3f, 0xaf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xb7, 0x3e, 0x75, - 0xee, 0x32, 0x03, 0x00, 0x00, + // 481 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0xdf, 0x6a, 0xdb, 0x3e, + 0x14, 0xc7, 0x71, 0x92, 0x5f, 0x7f, 0xf5, 0x49, 0x28, 0x99, 0xf6, 0x07, 0xcf, 0xf4, 0x22, 0x78, + 0x0c, 0xca, 0x18, 0x36, 0x6c, 0x2b, 0x74, 0xbd, 0xe8, 0x45, 0xd7, 0x40, 0xc7, 0x48, 0x18, 0x66, + 0xa4, 0x6c, 0xbb, 0x18, 0x4a, 0x7a, 0xea, 0x98, 0xda, 0x96, 0x90, 0x94, 0x52, 0x3f, 0xc6, 0xde, + 0x60, 0x8f, 0xb0, 0x47, 0x1c, 0x96, 0x9d, 0x5a, 0xf6, 0x7c, 0x91, 0x8b, 0xdd, 0x49, 0xe7, 0x7c, + 0xbf, 0x9f, 0x73, 0x74, 0x24, 0x1b, 0x9e, 0x72, 0x14, 0x32, 0x96, 0x0a, 0x33, 0xf5, 0x23, 0x95, + 0x91, 0xf4, 0xb9, 0x60, 0x8a, 0x91, 0x03, 0x7e, 0xa7, 0xae, 0xa9, 0xa2, 0x52, 0x31, 0x41, 0x23, + 0xf4, 0x7e, 0x59, 0x00, 0xd3, 0x7b, 0x1e, 0x8b, 0xfc, 0x82, 0x2a, 0x4a, 0x8e, 0xa1, 0x9f, 0x52, + 0xee, 0x58, 0x93, 0xfe, 0xd1, 0xf0, 0xcd, 0x0b, 0xbf, 0x29, 0xf6, 0x6b, 0xa1, 0x3f, 0xa3, 0x7c, + 0x9a, 0x29, 0x91, 0x87, 0x85, 0xde, 0xfd, 0x0e, 0xfb, 0xdb, 0x00, 0x19, 0x43, 0xff, 0x16, 0x73, + 0xc7, 0x9a, 0x58, 0x47, 0x76, 0x58, 0x2c, 0xc9, 0x7b, 0xf8, 0xef, 0x8e, 0x26, 0x1b, 0x74, 0x7a, + 0x13, 0xab, 0x0b, 0x3b, 0xa7, 0x29, 0x4a, 0x4e, 0x57, 0x58, 0xf3, 0xc3, 0xd2, 0x71, 0xda, 0x3b, + 0xb1, 0xbc, 0x9f, 0x03, 0x78, 0xdc, 0x21, 0x21, 0x0b, 0x18, 0x72, 0x81, 0x12, 0x33, 0x55, 0x6c, + 0xab, 0x9e, 0xdf, 0xed, 0x00, 0xf7, 0x3f, 0xd7, 0xb6, 0xf2, 0x10, 0x26, 0xa8, 0xe0, 0xa6, 0xb1, + 0x94, 0x71, 0x16, 0x69, 0x6e, 0x6f, 0x77, 0xee, 0xac, 0xb6, 0x55, 0x5c, 0x03, 0x44, 0xbe, 0xc2, + 0x68, 0xc9, 0x98, 0xfa, 0xb4, 0xb8, 0xa4, 0x72, 0x8d, 0xd2, 0xe9, 0x6b, 0xf0, 0xf1, 0x2e, 0xe0, + 0x73, 0xc3, 0x57, 0x92, 0x1b, 0x28, 0x77, 0x01, 0xe3, 0xf6, 0x99, 0x3a, 0xee, 0xe1, 0x75, 0xf3, + 0x1e, 0x9e, 0xb5, 0x2b, 0x7f, 0xb9, 0x9f, 0x6f, 0x52, 0x69, 0x8c, 0xde, 0x3d, 0x83, 0x71, 0xfb, + 0x4c, 0x1d, 0xdc, 0x27, 0x26, 0x77, 0xdf, 0xf4, 0x5f, 0xc1, 0xa3, 0xbf, 0x5a, 0xff, 0x17, 0x8d, + 0x79, 0x17, 0x00, 0x35, 0x98, 0x38, 0xf0, 0xff, 0x2d, 0xe6, 0xc5, 0x52, 0x53, 0x47, 0xe1, 0x76, + 0x4b, 0x0e, 0xc1, 0xd6, 0x26, 0x9d, 0xeb, 0xe9, 0x5c, 0x1d, 0xf0, 0xce, 0x60, 0x64, 0xb6, 0x47, + 0x7c, 0x18, 0x24, 0xb1, 0x54, 0xd5, 0x53, 0x72, 0xdb, 0x6d, 0xd4, 0xda, 0x50, 0xeb, 0xbc, 0x43, + 0xd8, 0x2b, 0x5b, 0x23, 0xc4, 0x70, 0x0e, 0xaa, 0xec, 0x6f, 0x0b, 0x86, 0x1f, 0x58, 0x92, 0x4c, + 0x93, 0xe8, 0x63, 0x76, 0xc3, 0xc8, 0x25, 0xd8, 0x99, 0x2c, 0x02, 0xb3, 0x87, 0x2f, 0xec, 0x55, + 0xbb, 0x84, 0xa1, 0xf7, 0xe7, 0x5b, 0x71, 0x79, 0xe3, 0xb5, 0xd9, 0xbd, 0x82, 0x83, 0x66, 0xb2, + 0x63, 0xa6, 0x41, 0x73, 0xa6, 0xcf, 0xbb, 0x2a, 0xe9, 0xa7, 0x66, 0x8e, 0xf5, 0x25, 0xd8, 0x0f, + 0xf1, 0x62, 0xaa, 0x98, 0x29, 0x11, 0xa3, 0xd4, 0xdd, 0xda, 0xe1, 0x76, 0x7b, 0x7e, 0xfa, 0xed, + 0x24, 0x8a, 0xd5, 0x7a, 0xb3, 0xf4, 0x57, 0x2c, 0x0d, 0xd6, 0x39, 0x47, 0x91, 0xe0, 0x75, 0x84, + 0x22, 0xb8, 0xa1, 0x4b, 0x11, 0xaf, 0x82, 0x15, 0x13, 0x18, 0x54, 0xa1, 0x66, 0xdd, 0xe5, 0x9e, + 0xfe, 0x0f, 0xbd, 0xfd, 0x13, 0x00, 0x00, 0xff, 0xff, 0xa1, 0x8d, 0x56, 0x84, 0xa0, 0x04, 0x00, + 0x00, } diff --git a/core/ledger/pvtdatastorage/persistent_msgs.proto b/core/ledger/pvtdatastorage/persistent_msgs.proto index 0ca9e07fde9..deb149f4394 100644 --- a/core/ledger/pvtdatastorage/persistent_msgs.proto +++ b/core/ledger/pvtdatastorage/persistent_msgs.proto @@ -11,16 +11,27 @@ option go_package = "github.com/hyperledger/fabric/core/ledger/pvtdatastorage"; package pvtdatastorage; message ExpiryData { - map map = 1; + map map = 1; } -message Collections { +message NamespaceExpiryData { // for pvt data, there would be an // entry in TxNums - map map = 1; + map presentData = 1; // for any number of missing pvt data of a collection, // there would be an entry in the map - map missingDataMap = 2; + map missingData = 2; + //entries for hashes for the pvtdata key-values (loaded from snapshot data) + map bootKVHashes = 3; +} + +message BootKVHash { + bytes keyHash = 1; + bytes valueHash = 2; +} + +message BootKVHashes { + repeated BootKVHash list = 1; } message TxNums { diff --git a/core/ledger/pvtdatastorage/persistent_msgs_helper.go b/core/ledger/pvtdatastorage/persistent_msgs_helper.go index 89d03c165e9..2e174899341 100644 --- a/core/ledger/pvtdatastorage/persistent_msgs_helper.go +++ b/core/ledger/pvtdatastorage/persistent_msgs_helper.go @@ -7,44 +7,60 @@ SPDX-License-Identifier: Apache-2.0 package pvtdatastorage func newExpiryData() *ExpiryData { - return &ExpiryData{Map: make(map[string]*Collections)} + return &ExpiryData{Map: make(map[string]*NamespaceExpiryData)} } -func (e *ExpiryData) getOrCreateCollections(ns string) *Collections { - collections, ok := e.Map[ns] +func (e *ExpiryData) getOrCreateCollections(ns string) *NamespaceExpiryData { + nsExpiryData, ok := e.Map[ns] if !ok { - collections = &Collections{ - Map: make(map[string]*TxNums), - MissingDataMap: make(map[string]bool)} - e.Map[ns] = collections + nsExpiryData = &NamespaceExpiryData{ + PresentData: make(map[string]*TxNums), + MissingData: make(map[string]bool), + BootKVHashes: make(map[string]*TxNums), + } + e.Map[ns] = nsExpiryData } else { // due to protobuf encoding/decoding, the previously // initialized map could be a nil now due to 0 length. // Hence, we need to reinitialize the map. - if collections.Map == nil { - collections.Map = make(map[string]*TxNums) + if nsExpiryData.PresentData == nil { + nsExpiryData.PresentData = make(map[string]*TxNums) + } + if nsExpiryData.MissingData == nil { + nsExpiryData.MissingData = make(map[string]bool) } - if collections.MissingDataMap == nil { - collections.MissingDataMap = make(map[string]bool) + if nsExpiryData.BootKVHashes == nil { + nsExpiryData.BootKVHashes = make(map[string]*TxNums) } } - return collections + return nsExpiryData } func (e *ExpiryData) addPresentData(ns, coll string, txNum uint64) { - collections := e.getOrCreateCollections(ns) + nsExpiryData := e.getOrCreateCollections(ns) - txNums, ok := collections.Map[coll] + txNums, ok := nsExpiryData.PresentData[coll] if !ok { txNums = &TxNums{} - collections.Map[coll] = txNums + nsExpiryData.PresentData[coll] = txNums } txNums.List = append(txNums.List, txNum) } func (e *ExpiryData) addMissingData(ns, coll string) { - collections := e.getOrCreateCollections(ns) - collections.MissingDataMap[coll] = true + nsExpiryData := e.getOrCreateCollections(ns) + nsExpiryData.MissingData[coll] = true +} + +func (e *ExpiryData) addBootKVHash(ns, coll string, txNum uint64) { + nsExpiryData := e.getOrCreateCollections(ns) + + txNums, ok := nsExpiryData.BootKVHashes[coll] + if !ok { + txNums = &TxNums{} + nsExpiryData.BootKVHashes[coll] = txNums + } + txNums.List = append(txNums.List, txNum) } func newCollElgInfo(nsCollMap map[string][]string) *CollElgInfo { diff --git a/core/ledger/pvtdatastorage/snapshot_data_importer.go b/core/ledger/pvtdatastorage/snapshot_data_importer.go new file mode 100644 index 00000000000..67e393fa5b5 --- /dev/null +++ b/core/ledger/pvtdatastorage/snapshot_data_importer.go @@ -0,0 +1,329 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "math" + + "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" + "github.com/hyperledger/fabric/core/chaincode/implicitcollection" + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/confighistory" + "github.com/hyperledger/fabric/core/ledger/internal/version" + "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" + "github.com/pkg/errors" + "github.com/willf/bitset" +) + +type SnapshotDataImporter struct { + namespacesVisited map[string]struct{} + eligibilityAndBTLCache *eligibilityAndBTLCache + + db *leveldbhelper.DBHandle +} + +func newSnapshotDataImporter( + ledgerID string, + dbHandle *leveldbhelper.DBHandle, + membershipProvider ledger.MembershipInfoProvider, + configHistoryRetriever *confighistory.Retriever, +) *SnapshotDataImporter { + return &SnapshotDataImporter{ + namespacesVisited: map[string]struct{}{}, + eligibilityAndBTLCache: newEligibilityAndBTLCache(ledgerID, membershipProvider, configHistoryRetriever), + db: dbHandle, + } +} + +func (i *SnapshotDataImporter) ConsumeSnapshotData( + namespace, collection string, + keyHash, valueHash []byte, + version *version.Height) error { + + if _, ok := i.namespacesVisited[namespace]; !ok { + if err := i.eligibilityAndBTLCache.loadDataFor(namespace); err != nil { + return err + } + i.namespacesVisited[namespace] = struct{}{} + } + + blkNum := version.BlockNum + txNum := version.TxNum + + isEligible, err := i.eligibilityAndBTLCache.isEligibile(namespace, collection, blkNum) + if err != nil { + return err + } + + dbUpdater := &dbUpdater{ + db: i.db, + batch: i.db.NewUpdateBatch(), + } + + err = dbUpdater.upsertMissingDataEntry( + &missingDataKey{ + nsCollBlk{ + ns: namespace, + coll: collection, + blkNum: blkNum, + }, + }, + txNum, + isEligible, + ) + if err != nil { + return err + } + + err = dbUpdater.upsertBootKVHashes( + &bootKVHashesKey{ + ns: namespace, + coll: collection, + blkNum: blkNum, + txNum: txNum, + }, + keyHash, + valueHash, + ) + if err != nil { + return err + } + + hasExpiry, expiringBlk := i.eligibilityAndBTLCache.hasExpiry(namespace, collection, blkNum) + if hasExpiry { + err := dbUpdater.upsertExpiryEntry( + &expiryKey{ + committingBlk: blkNum, + expiringBlk: expiringBlk, + }, + namespace, collection, txNum, + ) + if err != nil { + return err + } + } + + return dbUpdater.commitBatch() +} + +type nsColl struct { + ns, coll string +} + +type eligibility struct { + configBlockNum uint64 + isEligible bool +} + +type eligibilityAndBTLCache struct { + ledgerID string + membershipProvider ledger.MembershipInfoProvider + configHistoryRetriever *confighistory.Retriever + + eligibilityHistory map[nsColl][]*eligibility + btl map[nsColl]uint64 +} + +func newEligibilityAndBTLCache( + ledgerID string, + membershipProvider ledger.MembershipInfoProvider, + configHistoryRetriever *confighistory.Retriever) *eligibilityAndBTLCache { + + return &eligibilityAndBTLCache{ + ledgerID: ledgerID, + membershipProvider: membershipProvider, + configHistoryRetriever: configHistoryRetriever, + eligibilityHistory: map[nsColl][]*eligibility{}, + btl: map[nsColl]uint64{}, + } +} + +func (i *eligibilityAndBTLCache) loadDataFor(namespace string) error { + var queryBlkNum uint64 = math.MaxUint64 + for { + configInfo, err := i.configHistoryRetriever.MostRecentCollectionConfigBelow(queryBlkNum, namespace) + if err != nil || configInfo == nil { + return err + } + + committingBlkNum := configInfo.CommittingBlockNum + collections := configInfo.CollectionConfig.GetConfig() + + for _, collection := range collections { + staticCollection := collection.GetStaticCollectionConfig() + eligible, err := i.membershipProvider.AmMemberOf(i.ledgerID, staticCollection.MemberOrgsPolicy) + if err != nil { + return err + } + key := nsColl{ + ns: namespace, + coll: staticCollection.Name, + } + i.eligibilityHistory[key] = append(i.eligibilityHistory[key], + &eligibility{ + configBlockNum: committingBlkNum, + isEligible: eligible, + }, + ) + if staticCollection.BlockToLive > 0 { + i.btl[key] = staticCollection.BlockToLive + } + } + queryBlkNum = committingBlkNum + } +} + +func (i *eligibilityAndBTLCache) isEligibile(namespace, collection string, dataBlockNum uint64) (bool, error) { + if implicitcollection.IsImplicitCollection(collection) { + return collection == i.membershipProvider.MyImplicitCollectionName(), nil + } + + key := nsColl{ + ns: namespace, + coll: collection, + } + history := i.eligibilityHistory[key] + + if len(history) == 0 { + return false, + errors.Errorf( + "unexpected error - no collection config history for ", + namespace, collection, + ) + } + + if dataBlockNum <= history[len(history)-1].configBlockNum { + return false, + errors.Errorf( + "unexpected error - no collection config found below block number [%d] for ", + dataBlockNum, namespace, collection, + ) + } + + for _, h := range history { + if h.configBlockNum >= dataBlockNum { + if h.isEligible { + return true, nil + } + continue + } + return h.isEligible, nil + } + + return false, errors.Errorf("unexpected code path - potential bug") +} + +func (i *eligibilityAndBTLCache) hasExpiry(namespace, collection string, committingBlk uint64) (bool, uint64) { + var expiringBlk uint64 = math.MaxUint64 + btl, ok := i.btl[nsColl{ + ns: namespace, + coll: collection, + }] + if ok { + expiringBlk = pvtdatapolicy.ComputeExpiringBlock(namespace, collection, committingBlk, btl) + } + return expiringBlk < math.MaxUint64, expiringBlk +} + +type dbUpdater struct { + db *leveldbhelper.DBHandle + batch *leveldbhelper.UpdateBatch +} + +func (u *dbUpdater) upsertMissingDataEntry(key *missingDataKey, committingTxNum uint64, isEligible bool) error { + var encKey []byte + if isEligible { + encKey = encodeElgPrioMissingDataKey(key) + } else { + encKey = encodeInelgMissingDataKey(key) + } + + encVal, err := u.db.Get(encKey) + if err != nil { + return errors.WithMessage(err, "error while getting missing data bitmap from the store") + } + + var missingData *bitset.BitSet + if encVal != nil { + if missingData, err = decodeMissingDataValue(encVal); err != nil { + return err + } + } else { + missingData = &bitset.BitSet{} + } + + missingData.Set(uint(committingTxNum)) + encVal, err = encodeMissingDataValue(missingData) + if err != nil { + return err + } + u.batch.Put(encKey, encVal) + return nil +} + +func (u *dbUpdater) upsertBootKVHashes(key *bootKVHashesKey, keyHash, valueHash []byte) error { + encKey := encodeBootKVHashesKey(key) + encVal, err := u.db.Get(encKey) + if err != nil { + return err + } + + var val *BootKVHashes + if encVal != nil { + if val, err = decodeBootKVHashesVal(encVal); err != nil { + return err + } + } else { + val = &BootKVHashes{} + } + + val.List = append(val.List, + &BootKVHash{ + KeyHash: keyHash, + ValueHash: valueHash, + }, + ) + if encVal, err = encodeBootKVHashesVal(val); err != nil { + return errors.Wrap(err, "error while marshalling BootKVHashes") + } + u.batch.Put(encKey, encVal) + return nil +} + +func (u *dbUpdater) upsertExpiryEntry( + key *expiryKey, + namesapce, collection string, + txNum uint64, +) error { + encKey := encodeExpiryKey(key) + encVal, err := u.db.Get(encKey) + if err != nil { + return err + } + + var val *ExpiryData + if encVal != nil { + if val, err = decodeExpiryValue(encVal); err != nil { + return err + } + } else { + val = newExpiryData() + } + + val.addMissingData(namesapce, collection) + val.addBootKVHash(namesapce, collection, txNum) + encVal, err = encodeExpiryValue(val) + if err != nil { + return err + } + u.batch.Put(encKey, encVal) + return nil +} + +func (u *dbUpdater) commitBatch() error { + return u.db.WriteBatch(u.batch, true) +} diff --git a/core/ledger/pvtdatastorage/snapshot_data_importer_test.go b/core/ledger/pvtdatastorage/snapshot_data_importer_test.go new file mode 100644 index 00000000000..8e3dce42124 --- /dev/null +++ b/core/ledger/pvtdatastorage/snapshot_data_importer_test.go @@ -0,0 +1,1034 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "fmt" + "io/ioutil" + "math" + "os" + "path" + "testing" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/msp" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" + "github.com/hyperledger/fabric/core/chaincode/implicitcollection" + "github.com/hyperledger/fabric/core/ledger/confighistory/confighistorytest" + "github.com/hyperledger/fabric/core/ledger/internal/version" + "github.com/hyperledger/fabric/core/ledger/mock" + "github.com/stretchr/testify/require" + "github.com/willf/bitset" +) + +func TestSnapshotImporter(t *testing.T) { + ledgerID := "test-ledger" + myMSPID := "myOrg" + + setup := func() (*SnapshotDataImporter, *confighistorytest.Mgr, *dbEntriesVerifier) { + testDir := testDir(t) + t.Cleanup(func() { os.RemoveAll(testDir) }) + dbProvider, err := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: testDir}) + require.NoError(t, err) + t.Cleanup(func() { dbProvider.Close() }) + + configHistoryMgr, err := confighistorytest.NewMgr(path.Join(testDir, "config-history")) + require.NoError(t, err) + t.Cleanup(func() { configHistoryMgr.Close() }) + + db := dbProvider.GetDBHandle(ledgerID) + + return newSnapshotDataImporter( + ledgerID, + db, + newMockMembershipProvider(myMSPID), + configHistoryMgr.GetRetriever(ledgerID), + ), + configHistoryMgr, + &dbEntriesVerifier{t, db} + } + + t.Run("coll-eligible-and-never-expires", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr, dbVerifier := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 0, + }, + }, + }, + ) + require.NoError(t, err) + + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.NoError(t, err) + + dbVerifier.verifyBootKVHashesEntry( + &bootKVHashesKey{blkNum: 20, txNum: 300, ns: "ns", coll: "coll"}, + &BootKVHashes{ + List: []*BootKVHash{ + {KeyHash: []byte("key-hash"), ValueHash: []byte("value-hash")}, + }, + }, + ) + + dbVerifier.verifyMissingDataEntry( + &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: 20}}, + true, + (&bitset.BitSet{}).Set(300), + ) + + dbVerifier.verifyNoExpiryEntries() + }) + + t.Run("coll-ineligible-and-never-expires", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr, dbVerifier := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamOut.toMemberOrgPolicy(), + BlockToLive: 0, + }, + }, + }, + ) + require.NoError(t, err) + + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.NoError(t, err) + + dbVerifier.verifyBootKVHashesEntry( + &bootKVHashesKey{blkNum: 20, txNum: 300, ns: "ns", coll: "coll"}, + &BootKVHashes{ + List: []*BootKVHash{ + {KeyHash: []byte("key-hash"), ValueHash: []byte("value-hash")}, + }, + }, + ) + + dbVerifier.verifyMissingDataEntry( + &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: 20}}, + false, + (&bitset.BitSet{}).Set(300), + ) + + dbVerifier.verifyNoExpiryEntries() + }) + + t.Run("coll-eligible-and-expires", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr, dbVerifier := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 30, + }, + }, + }, + ) + require.NoError(t, err) + + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.NoError(t, err) + + dbVerifier.verifyBootKVHashesEntry( + &bootKVHashesKey{blkNum: 20, txNum: 300, ns: "ns", coll: "coll"}, + &BootKVHashes{ + List: []*BootKVHash{ + {KeyHash: []byte("key-hash"), ValueHash: []byte("value-hash")}, + }, + }, + ) + + dbVerifier.verifyMissingDataEntry( + &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: 20}}, + true, + (&bitset.BitSet{}).Set(300), + ) + + dbVerifier.verifyExpiryEntry( + &expiryKey{committingBlk: 20, expiringBlk: 20 + 30 + 1}, + &ExpiryData{ + Map: map[string]*NamespaceExpiryData{ + "ns": { + MissingData: map[string]bool{ + "coll": true, + }, + BootKVHashes: map[string]*TxNums{ + "coll": {List: []uint64{300}}, + }, + }, + }, + }, + ) + }) + + t.Run("coll-ineligible-and-expires", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr, dbVerifier := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamOut.toMemberOrgPolicy(), + BlockToLive: 30, + }, + }, + }, + ) + require.NoError(t, err) + + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.NoError(t, err) + + dbVerifier.verifyBootKVHashesEntry( + &bootKVHashesKey{blkNum: 20, txNum: 300, ns: "ns", coll: "coll"}, + &BootKVHashes{ + List: []*BootKVHash{ + {KeyHash: []byte("key-hash"), ValueHash: []byte("value-hash")}, + }, + }, + ) + + dbVerifier.verifyMissingDataEntry( + &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: 20}}, + false, + (&bitset.BitSet{}).Set(300), + ) + + dbVerifier.verifyExpiryEntry( + &expiryKey{committingBlk: 20, expiringBlk: 20 + 30 + 1}, + &ExpiryData{ + Map: map[string]*NamespaceExpiryData{ + "ns": { + MissingData: map[string]bool{ + "coll": true, + }, + BootKVHashes: map[string]*TxNums{ + "coll": {List: []uint64{300}}, + }, + }, + }, + }, + ) + }) + + t.Run("implicit-coll-for-my-org", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr, dbVerifier := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{}, + ) + require.NoError(t, err) + + myImplicitColl := implicitcollection.NameForOrg(myMSPID) + err = snapshotDataImporter.ConsumeSnapshotData("ns", myImplicitColl, + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.NoError(t, err) + + dbVerifier.verifyBootKVHashesEntry( + &bootKVHashesKey{blkNum: 20, txNum: 300, ns: "ns", coll: myImplicitColl}, + &BootKVHashes{ + List: []*BootKVHash{ + {KeyHash: []byte("key-hash"), ValueHash: []byte("value-hash")}, + }, + }, + ) + + dbVerifier.verifyMissingDataEntry( + &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns", coll: myImplicitColl, blkNum: 20}}, + true, + (&bitset.BitSet{}).Set(300), + ) + + dbVerifier.verifyNoExpiryEntries() + }) + + t.Run("implicit-coll-not-for-my-org", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr, dbVerifier := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{}, + ) + require.NoError(t, err) + + otherOrgImplicitColl := implicitcollection.NameForOrg("SomeOtherOrg") + err = snapshotDataImporter.ConsumeSnapshotData("ns", otherOrgImplicitColl, + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.NoError(t, err) + + dbVerifier.verifyBootKVHashesEntry( + &bootKVHashesKey{blkNum: 20, txNum: 300, ns: "ns", coll: otherOrgImplicitColl}, + &BootKVHashes{ + List: []*BootKVHash{ + {KeyHash: []byte("key-hash"), ValueHash: []byte("value-hash")}, + }, + }, + ) + + dbVerifier.verifyMissingDataEntry( + &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns", coll: otherOrgImplicitColl, blkNum: 20}}, + false, + (&bitset.BitSet{}).Set(300), + ) + + dbVerifier.verifyNoExpiryEntries() + }) + + t.Run("loads-collection-config-into-cache-only-once", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr, _ := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamOut.toMemberOrgPolicy(), + BlockToLive: 30, + }, + }, + }, + ) + require.NoError(t, err) + + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.NoError(t, err) + + _, ok := snapshotDataImporter.namespacesVisited["ns"] + require.True(t, ok) + + snapshotDataImporter.eligibilityAndBTLCache.eligibilityHistory[nsColl{"ns", "coll"}] = nil + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash-1"), []byte("value-hash-1"), + version.NewHeight(21, 300), + ) + require.EqualError(t, err, "unexpected error - no collection config history for ") + }) +} + +func TestSnapshotImporterErrorPropagation(t *testing.T) { + ledgerID := "test-ledger" + myMSPID := "myOrg" + + setup := func() (*SnapshotDataImporter, *confighistorytest.Mgr) { + testDir := testDir(t) + t.Cleanup(func() { os.RemoveAll(testDir) }) + dbProvider, err := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: testDir}) + require.NoError(t, err) + t.Cleanup(func() { dbProvider.Close() }) + + configHistoryMgr, err := confighistorytest.NewMgr(path.Join(testDir, "config-history")) + require.NoError(t, err) + t.Cleanup(func() { configHistoryMgr.Close() }) + + db := dbProvider.GetDBHandle(ledgerID) + + return newSnapshotDataImporter( + ledgerID, + db, + newMockMembershipProvider(myMSPID), + configHistoryMgr.GetRetriever(ledgerID), + ), + configHistoryMgr + } + + t.Run("expect-error-when-no-coll-config-not-present-for-namespace", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{}, + ) + require.NoError(t, err) + + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.EqualError(t, err, "unexpected error - no collection config history for ") + }) + + t.Run("expect-error-when-coll-config-below-data-item-not-present", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamOut.toMemberOrgPolicy(), + }, + }, + }, + ) + require.NoError(t, err) + + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(10, 300), + ) + require.EqualError(t, err, "unexpected error - no collection config found below block number [10] for ") + }) + + t.Run("error-when-membershipProvider-returns error", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 30, + }, + }, + }, + ) + require.NoError(t, err) + + snapshotDataImporter.eligibilityAndBTLCache.membershipProvider.(*mock.MembershipInfoProvider).AmMemberOfReturns(false, fmt.Errorf("membership-error")) + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.EqualError(t, err, "membership-error") + }) + + t.Run("error-during-decoding-existing-missing-data-entry", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 30, + }, + }, + }, + ) + require.NoError(t, err) + err = snapshotDataImporter.db.Put( + encodeElgPrioMissingDataKey( + &missingDataKey{nsCollBlk: nsCollBlk{blkNum: 20, ns: "ns", coll: "coll"}}, + ), + []byte("garbage-value"), + false, + ) + require.NoError(t, err) + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.Contains(t, err.Error(), "error while decoding missing data value") + }) + + t.Run("error-during-decoding-existing-bootKVHashes", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 30, + }, + }, + }, + ) + require.NoError(t, err) + err = snapshotDataImporter.db.Put( + encodeBootKVHashesKey( + &bootKVHashesKey{blkNum: 20, txNum: 300, ns: "ns", coll: "coll"}, + ), + []byte("garbage-value"), + false, + ) + require.NoError(t, err) + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.Contains(t, err.Error(), "error while unmarshalling bytes for BootKVHashes") + }) + + t.Run("error-during-decoding-existing-expiryEntry", func(t *testing.T) { + snapshotDataImporter, configHistoryMgr := setup() + err := configHistoryMgr.Setup( + ledgerID, "ns", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 30, + }, + }, + }, + ) + require.NoError(t, err) + err = snapshotDataImporter.db.Put( + encodeExpiryKey( + &expiryKey{committingBlk: 20, expiringBlk: 51}, + ), + []byte("garbage-value"), + false, + ) + require.NoError(t, err) + err = snapshotDataImporter.ConsumeSnapshotData("ns", "coll", + []byte("key-hash"), []byte("value-hash"), + version.NewHeight(20, 300), + ) + require.Contains(t, err.Error(), "error while decoding expiry value") + }) +} + +func TestEligibilityAndBTLCacheLoadData(t *testing.T) { + testDir := testDir(t) + defer os.RemoveAll(testDir) + + configHistoryMgr, err := confighistorytest.NewMgr(testDir) + require.NoError(t, err) + defer configHistoryMgr.Close() + + // setup a sample config history for namespace1 + err = configHistoryMgr.Setup("test-ledger", "namespace1", + map[uint64][]*peer.StaticCollectionConfig{ + 15: { + { + Name: "coll1", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 0, + }, + { + Name: "coll2", + MemberOrgsPolicy: iamOut.toMemberOrgPolicy(), + BlockToLive: 100, + }, + { + Name: "coll3", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 500, + }, + }, + + 10: { + { + Name: "coll1", + MemberOrgsPolicy: iamOut.toMemberOrgPolicy(), + }, + { + Name: "coll2", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + }, + }, + + 5: { + { + Name: "coll1", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + }, + }, + }, + ) + require.NoError(t, err) + + // setup a sample config history for namespace2 + err = configHistoryMgr.Setup("test-ledger", "namespace2", + map[uint64][]*peer.StaticCollectionConfig{ + 50: { + { + Name: "coll1", + MemberOrgsPolicy: iamIn.toMemberOrgPolicy(), + BlockToLive: 300, + }, + }, + }, + ) + require.NoError(t, err) + + eligibilityAndBTLCache := eligibilityAndBTLCache{ + membershipProvider: newMockMembershipProvider("myOrg"), + configHistoryRetriever: configHistoryMgr.GetRetriever("test-ledger"), + eligibilityHistory: map[nsColl][]*eligibility{}, + btl: map[nsColl]uint64{}, + } + + err = eligibilityAndBTLCache.loadDataFor("namespace1") + require.NoError(t, err) + err = eligibilityAndBTLCache.loadDataFor("namespace2") + require.NoError(t, err) + + require.Equal( + t, + map[nsColl]uint64{ + {ns: "namespace1", coll: "coll2"}: 100, + {"namespace1", "coll3"}: 500, + {"namespace2", "coll1"}: 300, + }, + eligibilityAndBTLCache.btl, + ) + + require.Equal( + t, + map[nsColl][]*eligibility{ + {ns: "namespace1", coll: "coll1"}: {{configBlockNum: 15, isEligible: true}, {10, false}, {5, true}}, + {"namespace1", "coll2"}: {{15, false}, {10, true}}, + {"namespace1", "coll3"}: {{15, true}}, + {"namespace2", "coll1"}: {{50, true}}, + }, + eligibilityAndBTLCache.eligibilityHistory, + ) +} + +func TestEligibilityAndBTLCacheEligibleExplicitCollection(t *testing.T) { + eligibilityAndBTLCache := eligibilityAndBTLCache{ + membershipProvider: newMockMembershipProvider("myOrg"), + } + + t.Run("ineligible-then-eligible", func(t *testing.T) { + eligibilityAndBTLCache.eligibilityHistory = map[nsColl][]*eligibility{ + {ns: "ns", coll: "coll"}: { + {configBlockNum: 10, isEligible: true}, + {configBlockNum: 5, isEligible: false}, + }, + } + + for i := 6; i < 15; i++ { + eligible, err := eligibilityAndBTLCache.isEligibile("ns", "coll", uint64(i)) + require.NoError(t, err) + require.True(t, eligible) + } + }) + + t.Run("ineligible-eligible-again-ineligible", func(t *testing.T) { + eligibilityAndBTLCache.eligibilityHistory = map[nsColl][]*eligibility{ + {ns: "ns", coll: "coll"}: { + {configBlockNum: 20, isEligible: false}, + {configBlockNum: 10, isEligible: true}, + {configBlockNum: 5, isEligible: false}, + }, + } + + for i := 6; i <= 20; i++ { + eligible, err := eligibilityAndBTLCache.isEligibile("ns", "coll", uint64(i)) + require.NoError(t, err) + require.True(t, eligible) + } + for i := 21; i < 25; i++ { + eligible, err := eligibilityAndBTLCache.isEligibile("ns", "coll", uint64(i)) + require.NoError(t, err) + require.False(t, eligible) + } + }) + + t.Run("eligible-then-ineligible", func(t *testing.T) { + eligibilityAndBTLCache.eligibilityHistory = map[nsColl][]*eligibility{ + {ns: "ns", coll: "coll"}: { + {configBlockNum: 10, isEligible: false}, + {configBlockNum: 5, isEligible: true}, + }, + } + + for i := 6; i <= 10; i++ { + eligible, err := eligibilityAndBTLCache.isEligibile("ns", "coll", uint64(i)) + require.NoError(t, err) + require.True(t, eligible) + } + + for i := 11; i < 15; i++ { + eligible, err := eligibilityAndBTLCache.isEligibile("ns", "coll", uint64(i)) + require.NoError(t, err) + require.False(t, eligible) + } + }) + + t.Run("eligible-ineligible-again-eligible", func(t *testing.T) { + eligibilityAndBTLCache.eligibilityHistory = map[nsColl][]*eligibility{ + {ns: "ns", coll: "coll"}: { + {configBlockNum: 20, isEligible: true}, + {configBlockNum: 10, isEligible: false}, + {configBlockNum: 5, isEligible: true}, + }, + } + + for i := 6; i < 25; i++ { + eligible, err := eligibilityAndBTLCache.isEligibile("ns", "coll", uint64(i)) + require.NoError(t, err) + require.True(t, eligible) + } + }) +} + +func TestEligibilityAndBTLCacheDataExpiry(t *testing.T) { + t.Run("data-expires", func(t *testing.T) { + eligibilityAndBTLCache := eligibilityAndBTLCache{ + btl: map[nsColl]uint64{ + {ns: "ns", coll: "coll"}: 25, + }, + } + expires, expiringBlk := eligibilityAndBTLCache.hasExpiry("ns", "coll", 100) + require.True(t, expires) + require.Equal(t, uint64(100+25+1), expiringBlk) + }) + + t.Run("data-does-not-expires", func(t *testing.T) { + eligibilityAndBTLCache := eligibilityAndBTLCache{} + expires, expiringBlk := eligibilityAndBTLCache.hasExpiry("ns", "coll", 100) + require.False(t, expires) + require.Equal(t, uint64(math.MaxUint64), expiringBlk) + }) + +} + +func newMockMembershipProvider(myMspID string) *mock.MembershipInfoProvider { + p := &mock.MembershipInfoProvider{} + p.MyImplicitCollectionNameReturns(implicitcollection.NameForOrg("myOrg")) + p.AmMemberOfStub = func(namespace string, config *peer.CollectionPolicyConfig) (bool, error) { + return iamIn.sameAs(config), nil + } + return p +} + +type eligibilityVal uint8 + +const ( + iamIn eligibilityVal = iota + iamOut +) + +func (e eligibilityVal) toMemberOrgPolicy() *peer.CollectionPolicyConfig { + return &peer.CollectionPolicyConfig{ + Payload: &peer.CollectionPolicyConfig_SignaturePolicy{ + SignaturePolicy: &common.SignaturePolicyEnvelope{ + Identities: []*msp.MSPPrincipal{ + { + Principal: []byte{byte(e)}, + }, + }, + }, + }, + } +} + +func (e eligibilityVal) sameAs(p *peer.CollectionPolicyConfig) bool { + return e == eligibilityVal(p.GetSignaturePolicy().Identities[0].Principal[0]) +} + +func TestDBUpdator(t *testing.T) { + setup := func() (*dbUpdater, *dbEntriesVerifier, *leveldbhelper.Provider) { + testDir := testDir(t) + t.Cleanup(func() { os.RemoveAll(testDir) }) + + p, err := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: testDir}) + require.NoError(t, err) + t.Cleanup(func() { p.Close() }) + db := p.GetDBHandle("test-ledger") + batch := db.NewUpdateBatch() + return &dbUpdater{ + db: db, + batch: batch, + }, + &dbEntriesVerifier{ + t: t, + db: db, + }, + p + } + + t.Run("upsert-missing-data-entry-eligible-data", func(t *testing.T) { + dbUpdater, verifier, _ := setup() + missingDataKey := &missingDataKey{ + nsCollBlk{ + ns: "ns-1", + coll: "coll-1", + blkNum: 1, + }, + } + require.NoError(t, + dbUpdater.upsertMissingDataEntry(missingDataKey, 10, true), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + verifier.verifyMissingDataEntry(missingDataKey, true, (&bitset.BitSet{}).Set(10)) + + require.NoError(t, + dbUpdater.upsertMissingDataEntry(missingDataKey, 50, true), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + + verifier.verifyMissingDataEntry(missingDataKey, true, (&bitset.BitSet{}).Set(10).Set(50)) + }) + + t.Run("upsert-missing-data-entry-ineligible-data", func(t *testing.T) { + dbUpdater, verifier, _ := setup() + missingDataKey := &missingDataKey{ + nsCollBlk{ + ns: "ns-1", + coll: "coll-1", + blkNum: 1, + }, + } + + require.NoError(t, + dbUpdater.upsertMissingDataEntry(missingDataKey, 10, false), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + + verifier.verifyMissingDataEntry(missingDataKey, false, (&bitset.BitSet{}).Set(10)) + + require.NoError(t, + dbUpdater.upsertMissingDataEntry(missingDataKey, 50, false), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + + verifier.verifyMissingDataEntry(missingDataKey, false, (&bitset.BitSet{}).Set(10).Set(50)) + }) + + t.Run("upsert-bootKVHashes", func(t *testing.T) { + bootKVHashesKey := &bootKVHashesKey{ + blkNum: 1, + txNum: 2, + ns: "ns-1", + coll: "coll-1", + } + dbUpdater, verifier, _ := setup() + require.NoError(t, + dbUpdater.upsertBootKVHashes(bootKVHashesKey, []byte("key-hash"), []byte("value-hash")), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + + verifier.verifyBootKVHashesEntry(bootKVHashesKey, + &BootKVHashes{ + List: []*BootKVHash{ + { + KeyHash: []byte("key-hash"), + ValueHash: []byte("value-hash"), + }, + }, + }) + + require.NoError(t, + dbUpdater.upsertBootKVHashes(bootKVHashesKey, []byte("another-key-hash"), []byte("another-value-hash")), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + + verifier.verifyBootKVHashesEntry(bootKVHashesKey, + &BootKVHashes{ + List: []*BootKVHash{ + { + KeyHash: []byte("key-hash"), + ValueHash: []byte("value-hash"), + }, + { + KeyHash: []byte("another-key-hash"), + ValueHash: []byte("another-value-hash"), + }, + }, + }) + }) + + t.Run("upsert-expiryEntry", func(t *testing.T) { + expiryKey := &expiryKey{ + committingBlk: 2, + expiringBlk: 5, + } + dbUpdater, verifier, _ := setup() + require.NoError(t, + dbUpdater.upsertExpiryEntry(expiryKey, "ns-1", "coll-1", 10), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + + verifier.verifyExpiryEntry(expiryKey, + &ExpiryData{ + Map: map[string]*NamespaceExpiryData{ + "ns-1": { + MissingData: map[string]bool{ + "coll-1": true, + }, + BootKVHashes: map[string]*TxNums{ + "coll-1": {List: []uint64{10}}, + }, + }, + }, + }, + ) + + require.NoError(t, + dbUpdater.upsertExpiryEntry(expiryKey, "ns-1", "coll-1", 11), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + + verifier.verifyExpiryEntry(expiryKey, + &ExpiryData{ + Map: map[string]*NamespaceExpiryData{ + "ns-1": { + MissingData: map[string]bool{ + "coll-1": true, + }, + BootKVHashes: map[string]*TxNums{ + "coll-1": {List: []uint64{10, 11}}, + }, + }, + }, + }, + ) + + require.NoError(t, + dbUpdater.upsertExpiryEntry(expiryKey, "ns-1", "coll-2", 12), + ) + require.NoError(t, + dbUpdater.commitBatch(), + ) + + verifier.verifyExpiryEntry(expiryKey, + &ExpiryData{ + Map: map[string]*NamespaceExpiryData{ + "ns-1": { + MissingData: map[string]bool{ + "coll-1": true, + "coll-2": true, + }, + BootKVHashes: map[string]*TxNums{ + "coll-1": {List: []uint64{10, 11}}, + "coll-2": {List: []uint64{12}}, + }, + }, + }, + }, + ) + }) + + t.Run("error-propagation", func(t *testing.T) { + dbUpdater, _, dbProvider := setup() + dbProvider.Close() + + err := dbUpdater.upsertBootKVHashes( + &bootKVHashesKey{blkNum: 20, txNum: 20, ns: "nil", coll: ""}, + nil, + nil, + ) + require.Contains(t, err.Error(), "leveldb: closed") + + err = dbUpdater.upsertMissingDataEntry( + &missingDataKey{nsCollBlk: nsCollBlk{blkNum: 20, ns: "", coll: ""}}, + 20, + true, + ) + require.Contains(t, err.Error(), "leveldb: closed") + + err = dbUpdater.upsertExpiryEntry( + &expiryKey{committingBlk: 20, expiringBlk: 10}, + "", + "", + 20, + ) + require.Contains(t, err.Error(), "leveldb: closed") + + dbUpdater.batch.Put([]byte("key"), []byte("value")) + err = dbUpdater.commitBatch() + require.Contains(t, err.Error(), "leveldb: closed") + }) +} + +type dbEntriesVerifier struct { + t *testing.T + db *leveldbhelper.DBHandle +} + +func (v *dbEntriesVerifier) verifyMissingDataEntry(key *missingDataKey, isEligibile bool, expectedVal *bitset.BitSet) { + var k []byte + if isEligibile { + k = encodeElgPrioMissingDataKey(key) + } else { + k = encodeInelgMissingDataKey(key) + } + + valEnc, err := v.db.Get(k) + require.NoError(v.t, err) + val, err := decodeMissingDataValue(valEnc) + require.NoError(v.t, err) + require.Equal(v.t, expectedVal, val) +} + +func (v *dbEntriesVerifier) verifyBootKVHashesEntry(key *bootKVHashesKey, expectedVal *BootKVHashes) { + encVal, err := v.db.Get(encodeBootKVHashesKey(key)) + require.NoError(v.t, err) + val, err := decodeBootKVHashesVal(encVal) + require.NoError(v.t, err) + require.Equal(v.t, expectedVal, val) +} + +func (v *dbEntriesVerifier) verifyExpiryEntry(key *expiryKey, expectedVal *ExpiryData) { + encVal, err := v.db.Get(encodeExpiryKey(key)) + require.NoError(v.t, err) + val, err := decodeExpiryValue(encVal) + require.NoError(v.t, err) + require.Equal(v.t, expectedVal, val) +} + +func (v *dbEntriesVerifier) verifyNoExpiryEntries() { + iter, err := v.db.GetIterator(expiryKeyPrefix, append(expiryKeyPrefix, byte(0))) + defer iter.Release() + + require.NoError(v.t, err) + require.False(v.t, iter.Next()) + require.NoError(v.t, iter.Error()) +} + +func testDir(t *testing.T) string { + dir, err := ioutil.TempDir("", "snapshot-data-importer-") + require.NoError(t, err) + return dir +} diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index e6ec09403a3..30f8532c690 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -106,6 +106,13 @@ type missingDataKey struct { nsCollBlk } +type bootKVHashesKey struct { + blkNum uint64 + txNum uint64 + ns string + coll string +} + type storeEntries struct { dataEntries []*dataEntry expiryEntries []*expiryEntry @@ -559,7 +566,7 @@ func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error { batch := s.db.NewUpdateBatch() for _, expiryEntry := range expiryEntries { batch.Delete(encodeExpiryKey(expiryEntry.key)) - dataKeys, missingDataKeys := deriveKeys(expiryEntry) + dataKeys, missingDataKeys, bootKVHashesKeys := deriveKeys(expiryEntry) for _, dataKey := range dataKeys { batch.Delete(encodeDataKey(dataKey)) @@ -577,6 +584,10 @@ func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error { ) } + for _, bootKVHashesKey := range bootKVHashesKeys { + batch.Delete(encodeBootKVHashesKey(bootKVHashesKey)) + } + if err := s.db.WriteBatch(batch, false); err != nil { return err }