diff --git a/gossip/privdata/coordinator.go b/gossip/privdata/coordinator.go index 0ee68def231..e2d6a01eef1 100644 --- a/gossip/privdata/coordinator.go +++ b/gossip/privdata/coordinator.go @@ -124,6 +124,7 @@ type coordinator struct { Support store *transientstore.Store transientBlockRetention uint64 + logger util.Logger metrics *metrics.PrivdataMetrics pullRetryThreshold time.Duration skipPullingInvalidTransactions bool @@ -138,6 +139,7 @@ func NewCoordinator(mspID string, support Support, store *transientstore.Store, store: store, selfSignedData: selfSignedData, transientBlockRetention: config.TransientBlockRetention, + logger: logger.With("channel", support.ChainID), metrics: metrics, pullRetryThreshold: config.PullRetryThreshold, skipPullingInvalidTransactions: config.SkipPullingInvalidTransactions, @@ -154,15 +156,15 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa return errors.New("Block header is nil") } - logger.Infof("[%s] Received block [%d] from buffer", c.ChainID, block.Header.Number) + c.logger.Infof("Received block [%d] from buffer", block.Header.Number) - logger.Debugf("[%s] Validating block [%d]", c.ChainID, block.Header.Number) + c.logger.Debugf("Validating block [%d]", block.Header.Number) validationStart := time.Now() err := c.Validator.Validate(block) c.reportValidationDuration(time.Since(validationStart)) if err != nil { - logger.Errorf("Validation failed: %+v", err) + c.logger.Errorf("Validation failed: %+v", err) return err } @@ -204,7 +206,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa } pvtdataToRetrieve, err := c.getTxPvtdataInfoFromBlock(block) if err != nil { - logger.Warningf("Failed to get private data info from block: %s", err) + c.logger.Warningf("Failed to get private data info from block: %s", err) return err } @@ -212,7 +214,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa // RetrievePvtdata checks this peer's eligibility and then retreives from cache, transient store, or from a remote peer. retrievedPvtdata, err := pdp.RetrievePvtdata(pvtdataToRetrieve) if err != nil { - logger.Warningf("Failed to retrieve pvtdata: %s", err) + c.logger.Warningf("Failed to retrieve pvtdata: %s", err) return err } @@ -266,16 +268,16 @@ func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo protou } sp, err := c.CollectionStore.RetrieveCollectionAccessPolicy(cc) if err != nil { - logger.Warningf("Failed obtaining policy for collection criteria [%#v]: %s", cc, err) + c.logger.Warningf("Failed obtaining policy for collection criteria [%#v]: %s", cc, err) continue } isAuthorized := sp.AccessFilter() if isAuthorized == nil { - logger.Warningf("Failed obtaining filter for collection criteria [%#v]", cc) + c.logger.Warningf("Failed obtaining filter for collection criteria [%#v]", cc) continue } if !isAuthorized(peerAuthInfo) { - logger.Debugf("Skipping collection criteria [%#v] because peer isn't authorized", cc) + c.logger.Debugf("Skipping collection criteria [%#v] because peer isn't authorized", cc) continue } seqs2Namespaces.addCollection(uint64(seqInBlock), txPvtDataItem.WriteSet.DataModel, ns.Namespace, col) @@ -322,7 +324,7 @@ func (c *coordinator) getTxPvtdataInfoFromBlock(block *common.Block) ([]*ledger. colConfig, err := c.CollectionStore.RetrieveCollectionConfig(cc) if err != nil { - logger.Warningf("Failed to retrieve collection config for collection criteria [%#v]: %s", cc, err) + c.logger.Warningf("Failed to retrieve collection config for collection criteria [%#v]: %s", cc, err) return nil, err } col := &ledger.CollectionPvtdataInfo{ diff --git a/gossip/privdata/dataretriever.go b/gossip/privdata/dataretriever.go index a482f7cfd38..ac6cbe76bc8 100644 --- a/gossip/privdata/dataretriever.go +++ b/gossip/privdata/dataretriever.go @@ -32,14 +32,16 @@ type StorageDataRetriever interface { } type dataRetriever struct { + logger util.Logger store *transientstore.Store committer committer.Committer } // NewDataRetriever constructing function for implementation of the // StorageDataRetriever interface -func NewDataRetriever(store *transientstore.Store, committer committer.Committer) StorageDataRetriever { +func NewDataRetriever(channelID string, store *transientstore.Store, committer committer.Committer) StorageDataRetriever { return &dataRetriever{ + logger: logger.With("channel", channelID), store: store, committer: committer, } @@ -54,7 +56,7 @@ func (dr *dataRetriever) CollectionRWSet(digests []*protosgossip.PvtDataDigest, return nil, false, errors.Wrap(err, "wasn't able to read ledger height") } if height <= blockNum { - logger.Debug("Current ledger height ", height, "is below requested block sequence number", + dr.logger.Debug("Current ledger height ", height, "is below requested block sequence number", blockNum, "retrieving private data from transient store") } @@ -68,7 +70,7 @@ func (dr *dataRetriever) CollectionRWSet(digests []*protosgossip.PvtDataDigest, } pvtRWSet, err := dr.fromTransientStore(dig, filter) if err != nil { - logger.Errorf("couldn't read from transient store private read-write set, "+ + dr.logger.Errorf("couldn't read from transient store private read-write set, "+ "digest %+v, because of %s", dig, err) continue } @@ -108,7 +110,7 @@ func (dr *dataRetriever) fromLedger(digests []*protosgossip.PvtDataDigest, block pvtRWSetWithConfig := &util.PrivateRWSetWithConfig{} for _, data := range pvtData { if data.WriteSet == nil { - logger.Warning("Received nil write set for collection tx in block", data.SeqInBlock, "block number", blockNum) + dr.logger.Warning("Received nil write set for collection tx in block", data.SeqInBlock, "block number", blockNum) continue } @@ -177,25 +179,25 @@ func (dr *dataRetriever) fromTransientStore(dig *protosgossip.PvtDataDigest, fil } rws := res.PvtSimulationResultsWithConfig if rws == nil { - logger.Debug("Skipping nil PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight) + dr.logger.Debug("Skipping nil PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight) continue } txPvtRWSet := rws.PvtRwset if txPvtRWSet == nil { - logger.Debug("Skipping empty PvtRwset of PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight) + dr.logger.Debug("Skipping empty PvtRwset of PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight) continue } colConfigs, found := rws.CollectionConfigs[dig.Namespace] if !found { - logger.Error("No collection config was found for chaincode", dig.Namespace, "collection name", + dr.logger.Error("No collection config was found for chaincode", dig.Namespace, "collection name", dig.Namespace, "txID", dig.TxId) continue } configs := extractCollectionConfig(colConfigs, dig.Collection) if configs == nil { - logger.Error("No collection config was found for collection", dig.Collection, + dr.logger.Error("No collection config was found for collection", dig.Collection, "namespace", dig.Namespace, "txID", dig.TxId) continue } @@ -216,13 +218,13 @@ func (dr *dataRetriever) extractPvtRWsets(pvtRWSets []*rwset.NsPvtReadWriteSet, for _, nsws := range pvtRWSets { // and in each namespace - iterate over all collections if nsws.Namespace != namespace { - logger.Debug("Received private data namespace ", nsws.Namespace, " instead of ", namespace, " skipping...") + dr.logger.Debug("Received private data namespace ", nsws.Namespace, " instead of ", namespace, " skipping...") continue } for _, col := range nsws.CollectionPvtRwset { // This isn't the collection we're looking for if col.CollectionName != collectionName { - logger.Debug("Received private data collection ", col.CollectionName, " instead of ", collectionName, " skipping...") + dr.logger.Debug("Received private data collection ", col.CollectionName, " instead of ", collectionName, " skipping...") continue } // Add the collection pRWset to the accumulated set diff --git a/gossip/privdata/dataretriever_test.go b/gossip/privdata/dataretriever_test.go index 1ed10a88c30..2255fbd95af 100644 --- a/gossip/privdata/dataretriever_test.go +++ b/gossip/privdata/dataretriever_test.go @@ -38,7 +38,7 @@ func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) { committer.On("LedgerHeight").Return(uint64(1), nil) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) store.Persist(txID, 2, &transientstore.TxPvtReadWriteSetWithConfigInfo{ PvtRwset: &rwset.TxPvtReadWriteSet{ @@ -127,7 +127,7 @@ func TestNewDataRetriever_GetDataFromLedger(t *testing.T) { historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, namespace).Return(newCollectionConfig(collectionName), nil) committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) // Request digest for private data which is greater than current ledger height // to make it query ledger for missed private data @@ -173,7 +173,7 @@ func TestNewDataRetriever_FailGetPvtDataFromLedger(t *testing.T) { committer.On("GetPvtDataByNum", uint64(5), mock.Anything). Return(nil, errors.New("failing retrieving private data")) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) // Request digest for private data which is greater than current ledger height // to make it query transient store for missed private data @@ -218,7 +218,7 @@ func TestNewDataRetriever_GetOnlyRelevantPvtData(t *testing.T) { historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, namespace).Return(newCollectionConfig(collectionName), nil) committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) // Request digest for private data which is greater than current ledger height // to make it query transient store for missed private data @@ -306,7 +306,7 @@ func TestNewDataRetriever_GetMultipleDigests(t *testing.T) { historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, ns2).Return(newCollectionConfig(col2), nil) committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) // Request digest for private data which is greater than current ledger height // to make it query transient store for missed private data @@ -381,7 +381,7 @@ func TestNewDataRetriever_EmptyWriteSet(t *testing.T) { historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, ns1).Return(newCollectionConfig(col1), nil) committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{ Namespace: ns1, @@ -433,7 +433,7 @@ func TestNewDataRetriever_FailedObtainConfigHistoryRetriever(t *testing.T) { committer.On("GetPvtDataByNum", uint64(5), mock.Anything).Return(result, nil) committer.On("GetConfigHistoryRetriever").Return(nil, errors.New("failed to obtain ConfigHistoryRetriever")) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) _, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{ Namespace: ns1, @@ -488,9 +488,8 @@ func TestNewDataRetriever_NoCollectionConfig(t *testing.T) { Return(nil, nil) committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) assertion := assert.New(t) - _, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{ Namespace: ns1, Collection: col1, @@ -522,7 +521,7 @@ func TestNewDataRetriever_FailedGetLedgerHeight(t *testing.T) { col1 := "testCollectionName1" committer.On("LedgerHeight").Return(uint64(0), errors.New("failed to read ledger height")) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) _, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{ Namespace: ns1, @@ -548,7 +547,7 @@ func TestNewDataRetriever_EmptyPvtRWSetInTransientStore(t *testing.T) { committer.On("LedgerHeight").Return(uint64(1), nil) - retriever := NewDataRetriever(store.store, committer) + retriever := NewDataRetriever("testchannel", store.store, committer) rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{ Namespace: namespace, diff --git a/gossip/privdata/distributor.go b/gossip/privdata/distributor.go index cb04ed1e340..403a75cd95f 100644 --- a/gossip/privdata/distributor.go +++ b/gossip/privdata/distributor.go @@ -78,6 +78,7 @@ type distributorImpl struct { gossipAdapter CollectionAccessFactory pushAckTimeout time.Duration + logger util.Logger metrics *metrics.PrivdataMetrics } @@ -130,6 +131,7 @@ func NewDistributor(chainID string, gossip gossipAdapter, factory CollectionAcce gossipAdapter: gossip, CollectionAccessFactory: factory, pushAckTimeout: pushAckTimeout, + logger: logger.With("channel", chainID), metrics: metrics, } } @@ -157,7 +159,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, namespace := pvtRwset.Namespace configPackage, found := privDataWithConfig.CollectionConfigs[namespace] if !found { - logger.Error("Collection config package for", namespace, "chaincode is not provided") + d.logger.Error("Collection config package for", namespace, "chaincode is not provided") return nil, errors.New(fmt.Sprint("collection config package for", namespace, "chaincode is not provided")) } @@ -165,19 +167,19 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, colCP, err := d.getCollectionConfig(configPackage, collection) collectionName := collection.CollectionName if err != nil { - logger.Error("Could not find collection access policy for", namespace, " and collection", collectionName, "error", err) + d.logger.Error("Could not find collection access policy for", namespace, " and collection", collectionName, "error", err) return nil, errors.WithMessage(err, fmt.Sprint("could not find collection access policy for", namespace, " and collection", collectionName, "error", err)) } colAP, err := d.AccessPolicy(colCP, d.chainID) if err != nil { - logger.Error("Could not obtain collection access policy, collection name", collectionName, "due to", err) + d.logger.Error("Could not obtain collection access policy, collection name", collectionName, "due to", err) return nil, errors.Wrap(err, fmt.Sprint("Could not obtain collection access policy, collection name", collectionName, "due to", err)) } colFilter := colAP.AccessFilter() if colFilter == nil { - logger.Error("Collection access policy for", collectionName, "has no filter") + d.logger.Error("Collection access policy for", collectionName, "has no filter") return nil, errors.Errorf("No collection access policy filter computed for %v", collectionName) } @@ -186,7 +188,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, return nil, errors.WithStack(err) } - logger.Debugf("Computing dissemination plan for collection [%s]", collectionName) + d.logger.Debugf("Computing dissemination plan for collection [%s]", collectionName) dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg) if err != nil { return nil, errors.WithMessagef(err, "could not build private data dissemination plan for chaincode %s and collection %s", namespace, collectionName) @@ -220,7 +222,7 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces }) if err != nil { - logger.Error("Failed to retrieve peer routing filter for channel", d.chainID, ":", err) + d.logger.Error("Failed to retrieve peer routing filter for channel", d.chainID, ":", err) return nil, err } @@ -301,8 +303,8 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces maximumPeerRemainingCount-- if maximumPeerRemainingCount == 0 { - logger.Debug("MaximumPeerCount satisfied") - logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) + d.logger.Debug("MaximumPeerCount satisfied") + d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) return disseminationPlan, nil } } @@ -314,7 +316,7 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces numRemainingPeersToSelect = len(remainingPeersAcrossOrgs) } if numRemainingPeersToSelect > 0 { - logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect) + d.logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect) } for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 { required := 1 @@ -350,7 +352,7 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs[:selectedPeerIndex], remainingPeersAcrossOrgs[selectedPeerIndex+1:]...) } - logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) + d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) return disseminationPlan, nil } @@ -397,7 +399,7 @@ func (d *distributorImpl) disseminate(disseminationPlan []*dissemination) error if err != nil { atomic.AddUint32(&failures, 1) m := dis.msg.GetPrivateData().Payload - logger.Error("Failed disseminating private RWSet for TxID", m.TxId, ", namespace", m.Namespace, "collection", m.CollectionName, ":", err) + d.logger.Error("Failed disseminating private RWSet for TxID", m.TxId, ", namespace", m.Namespace, "collection", m.CollectionName, ":", err) } }(dis) } diff --git a/gossip/privdata/pull.go b/gossip/privdata/pull.go index 1b29c07837e..c844cee0cbe 100644 --- a/gossip/privdata/pull.go +++ b/gossip/privdata/pull.go @@ -69,6 +69,7 @@ type gossip interface { } type puller struct { + logger util.Logger metrics *metrics.PrivdataMetrics pubSub *util.PubSub stopChan chan struct{} @@ -85,6 +86,7 @@ type puller struct { func NewPuller(metrics *metrics.PrivdataMetrics, cs privdata.CollectionStore, g gossip, dataRetriever PrivateDataRetriever, factory CollectionAccessFactory, channel string, btlPullMargin uint64) *puller { p := &puller{ + logger: logger.With("channel", channel), metrics: metrics, pubSub: util.NewPubSub(), stopChan: make(chan struct{}), @@ -128,7 +130,7 @@ func (p *puller) listen() { } func (p *puller) handleRequest(message protoext.ReceivedMessage) { - logger.Debug("Got", message.GetGossipMessage(), "from", message.GetConnectionInfo().Endpoint) + p.logger.Debug("Got", message.GetGossipMessage(), "from", message.GetConnectionInfo().Endpoint) message.Respond(&protosgossip.GossipMessage{ Channel: []byte(p.channel), Tag: protosgossip.GossipMessage_CHAN_ONLY, @@ -147,7 +149,7 @@ func (p *puller) createResponse(message protoext.ReceivedMessage) []*protosgossi connectionEndpoint := message.GetConnectionInfo().Endpoint defer func() { - logger.Debug("Returning", connectionEndpoint, len(returned), "elements") + p.logger.Debug("Returning", connectionEndpoint, len(returned), "elements") }() msg := message.GetGossipMessage() @@ -159,7 +161,7 @@ func (p *puller) createResponse(message protoext.ReceivedMessage) []*protosgossi dig2rwSets, wasFetchedFromLedger, err := p.CollectionRWSet(digests, blockNum) p.metrics.RetrieveDuration.With("channel", p.channel).Observe(time.Since(start).Seconds()) if err != nil { - logger.Warningf("could not obtain private collection rwset for block %d, because of %s, continue...", blockNum, err) + p.logger.Warningf("could not obtain private collection rwset for block %d, because of %s, continue...", blockNum, err) continue } returned = append(returned, p.filterNotEligible(dig2rwSets, wasFetchedFromLedger, protoutil.SignedData{ @@ -182,15 +184,15 @@ func groupDigestsByBlockNum(digests []*protosgossip.PvtDataDigest) map[uint64][] func (p *puller) handleResponse(message protoext.ReceivedMessage) { msg := message.GetGossipMessage().GetPrivateRes() - logger.Debug("Got", msg, "from", message.GetConnectionInfo().Endpoint) + p.logger.Debug("Got", msg, "from", message.GetConnectionInfo().Endpoint) for _, el := range msg.Elements { if el.Digest == nil { - logger.Warning("Got nil digest from", message.GetConnectionInfo().Endpoint, "aborting") + p.logger.Warning("Got nil digest from", message.GetConnectionInfo().Endpoint, "aborting") return } hash, err := hashDigest(el.Digest) if err != nil { - logger.Warning("Failed hashing digest from", message.GetConnectionInfo().Endpoint, "aborting") + p.logger.Warning("Failed hashing digest from", message.GetConnectionInfo().Endpoint, "aborting") return } p.pubSub.Publish(hash, el) @@ -243,11 +245,11 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco // Get a list of peers per channel allFilters := dig2Filter.flattenFilterValues() members := p.waitForMembership() - logger.Debug("Total members in channel:", members) + p.logger.Debug("Total members in channel:", members) members = filter.AnyMatch(members, allFilters...) - logger.Debug("Total members that fit some digest:", members) + p.logger.Debug("Total members that fit some digest:", members) if len(members) == 0 { - logger.Warning("Do not know any peer in the channel(", p.channel, ") that matches the policies , aborting") + p.logger.Warning("Do not know any peer in the channel(", p.channel, ") that matches the policies , aborting") return nil, errors.New("Empty membership") } members = randomizeMemberList(members) @@ -275,24 +277,24 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco } if itemsLeftToCollect == 0 { - logger.Debug("No items left to collect") + p.logger.Debug("No items left to collect") return res, nil } peer2digests, members = p.assignDigestsToPeers(members, dig2Filter) if len(peer2digests) == 0 { - logger.Warning("No available peers for digests request, "+ + p.logger.Warning("No available peers for digests request, "+ "cannot pull missing private data for following digests [%+v], peer membership: [%+v]", dig2Filter.digests(), members) return res, nil } - logger.Debug("Matched", len(dig2Filter), "digests to", len(peer2digests), "peer(s)") + p.logger.Debug("Matched", len(dig2Filter), "digests to", len(peer2digests), "peer(s)") subscriptions := p.scatterRequests(peer2digests) responses := p.gatherResponses(subscriptions) for _, resp := range responses { if len(resp.Payload) == 0 { - logger.Debug("Got empty response for", resp.Digest) + p.logger.Debug("Got empty response for", resp.Digest) continue } delete(dig2Filter, privdatacommon.DigKey{ @@ -357,13 +359,13 @@ func (p *puller) scatterRequests(peersDigestMapping peer2Digests) []util.Subscri hash, err := hashDigest(dig) if err != nil { // Shouldn't happen as we just built this message ourselves - logger.Warning("Failed creating digest", err) + p.logger.Warning("Failed creating digest", err) continue } sub := p.pubSub.Subscribe(hash, responseWaitTime) subscriptions = append(subscriptions, sub) } - logger.Debug("Sending", peer.endpoint, "request", msg.GetPrivateReq().Digests) + p.logger.Debug("Sending", peer.endpoint, "request", msg.GetPrivateReq().Digests) p.Send(msg, peer.AsRemotePeer()) } return subscriptions @@ -373,8 +375,8 @@ type peer2Digests map[remotePeer][]protosgossip.PvtDataDigest type noneSelectedPeers []discovery.NetworkMember func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Filter digestToFilterMapping) (peer2Digests, noneSelectedPeers) { - if logger.IsEnabledFor(zapcore.DebugLevel) { - logger.Debug("Matching", members, "to", dig2Filter.String()) + if p.logger.IsEnabledFor(zapcore.DebugLevel) { + p.logger.Debug("Matching", members, "to", dig2Filter.String()) } res := make(map[remotePeer][]protosgossip.PvtDataDigest) // Create a mapping between peer and digests to ask for @@ -382,12 +384,12 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil // Find a peer that is a preferred peer selectedPeer := filter.First(members, collectionFilter.preferredPeer) if selectedPeer == nil { - logger.Debug("No preferred peer found for", dig) + p.logger.Debug("No preferred peer found for", dig) // Find some peer that is in the collection selectedPeer = filter.First(members, collectionFilter.anyPeer) } if selectedPeer == nil { - logger.Debug("No peer matches txID", dig.TxId, "collection", dig.Collection) + p.logger.Debug("No peer matches txID", dig.TxId, "collection", dig.Collection) continue } // Add the peer to the mapping from peer to digest slice @@ -561,14 +563,14 @@ func (p *puller) getPurgedCollections(members []discovery.NetworkMember, dig2Fil for dig := range dig2Filter { purged, err := p.purgedFilter(dig) if err != nil { - logger.Debug("Failed to obtain purged filter for digest %v", dig, "error", err) + p.logger.Debug("Failed to obtain purged filter for digest %v", dig, "error", err) continue } membersWithPurgedData := filter.AnyMatch(members, purged) // at least one peer already purged the data if len(membersWithPurgedData) > 0 { - logger.Debugf("Private data on channel [%s], chaincode [%s], collection name [%s] for txID = [%s],"+ + p.logger.Debugf("Private data on channel [%s], chaincode [%s], collection name [%s] for txID = [%s],"+ "has been purged at peers [%v]", p.channel, dig.Namespace, dig.Collection, dig.TxId, membersWithPurgedData) res = append(res, dig) @@ -590,7 +592,7 @@ func (p *puller) purgedFilter(dig privdatacommon.DigKey) (filter.RoutingFilter, return func(peer discovery.NetworkMember) bool { if peer.Properties == nil { - logger.Debugf("No properties provided for peer %s", peer.Endpoint) + p.logger.Debugf("No properties provided for peer %s", peer.Endpoint) return false } // BTL equals to zero has semantic of never expires @@ -603,7 +605,7 @@ func (p *puller) purgedFilter(dig privdatacommon.DigKey) (filter.RoutingFilter, isPurged := peerLedgerHeightWithMargin >= expirationSeqNum if isPurged { - logger.Debugf("skipping peer [%s], since pvt for channel [%s], txID = [%s], "+ + p.logger.Debugf("skipping peer [%s], since pvt for channel [%s], txID = [%s], "+ "collection [%s] has been purged or will soon be purged, BTL=[%d]", peer.Endpoint, p.channel, dig.TxId, cc.Collection, colPersistConfig.BlockToLive()) } @@ -615,11 +617,11 @@ func (p *puller) filterNotEligible(dig2rwSets Dig2PvtRWSetWithConfig, shouldChec var returned []*protosgossip.PvtDataElement for d, rwSets := range dig2rwSets { if rwSets == nil { - logger.Errorf("No private rwset for [%s] channel, chaincode [%s], collection [%s], txID = [%s] is available, skipping...", + p.logger.Errorf("No private rwset for [%s] channel, chaincode [%s], collection [%s], txID = [%s] is available, skipping...", p.channel, d.Namespace, d.Collection, d.TxId) continue } - logger.Debug("Found", len(rwSets.RWSet), "for TxID", d.TxId, ", collection", d.Collection, "for", endpoint) + p.logger.Debug("Found", len(rwSets.RWSet), "for TxID", d.TxId, ", collection", d.Collection, "for", endpoint) if len(rwSets.RWSet) == 0 { continue } @@ -629,19 +631,19 @@ func (p *puller) filterNotEligible(dig2rwSets Dig2PvtRWSetWithConfig, shouldChec if !eligibleForCollection { colAP, err := p.AccessPolicy(rwSets.CollectionConfig, p.channel) if err != nil { - logger.Debug("No policy found for channel", p.channel, ", collection", d.Collection, "txID", d.TxId, ":", err, "skipping...") + p.logger.Debug("No policy found for channel", p.channel, ", collection", d.Collection, "txID", d.TxId, ":", err, "skipping...") continue } colFilter := colAP.AccessFilter() if colFilter == nil { - logger.Debug("Collection ", d.Collection, " has no access filter, txID", d.TxId, "skipping...") + p.logger.Debug("Collection ", d.Collection, " has no access filter, txID", d.TxId, "skipping...") continue } eligibleForCollection = colFilter(signedData) } if !eligibleForCollection { - logger.Debug("Peer", endpoint, "isn't eligible for txID", d.TxId, "at collection", d.Collection) + p.logger.Debug("Peer", endpoint, "isn't eligible for txID", d.TxId, "at collection", d.Collection) continue } diff --git a/gossip/privdata/pull_test.go b/gossip/privdata/pull_test.go index 8b35b09e432..79741fad874 100644 --- a/gossip/privdata/pull_test.go +++ b/gossip/privdata/pull_test.go @@ -1116,7 +1116,7 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) { historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, ns2).Return(newCollectionConfig(col2), nil) committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil) - dataRetreiver := &counterDataRetreiver{PrivateDataRetriever: NewDataRetriever(store, committer), numberOfCalls: 0} + dataRetreiver := &counterDataRetreiver{PrivateDataRetriever: NewDataRetriever("testchannel", store, committer), numberOfCalls: 0} p2.PrivateDataRetriever = dataRetreiver dig1 := &privdatacommon.DigKey{ diff --git a/gossip/privdata/reconcile.go b/gossip/privdata/reconcile.go index 5a382b5a65a..75333739ee0 100644 --- a/gossip/privdata/reconcile.go +++ b/gossip/privdata/reconcile.go @@ -20,6 +20,7 @@ import ( "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/gossip/metrics" privdatacommon "github.com/hyperledger/fabric/gossip/privdata/common" + "github.com/hyperledger/fabric/gossip/util" "github.com/pkg/errors" ) @@ -56,6 +57,7 @@ type PvtDataReconciler interface { type Reconciler struct { channel string + logger util.Logger metrics *metrics.PrivdataMetrics ReconcileSleepInterval time.Duration ReconcileBatchSize int @@ -83,9 +85,11 @@ func (*NoOpReconciler) Stop() { // NewReconciler creates a new instance of reconciler func NewReconciler(channel string, metrics *metrics.PrivdataMetrics, c committer.Committer, fetcher ReconciliationFetcher, config *PrivdataConfig) *Reconciler { - logger.Debug("Private data reconciliation is enabled") + reconcilerLogger := logger.With("channel", channel) + reconcilerLogger.Debug("Private data reconciliation is enabled") return &Reconciler{ channel: channel, + logger: reconcilerLogger, metrics: metrics, ReconcileSleepInterval: config.ReconcileSleepInterval, ReconcileBatchSize: config.ReconcileBatchSize, @@ -113,9 +117,9 @@ func (r *Reconciler) run() { case <-r.stopChan: return case <-time.After(r.ReconcileSleepInterval): - logger.Debug("Start reconcile missing private info") + r.logger.Debug("Start reconcile missing private info") if err := r.reconcile(); err != nil { - logger.Error("Failed to reconcile missing private info, error: ", err.Error()) + r.logger.Error("Failed to reconcile missing private info, error: ", err.Error()) break } } @@ -126,11 +130,11 @@ func (r *Reconciler) run() { func (r *Reconciler) reconcile() error { missingPvtDataTracker, err := r.GetMissingPvtDataTracker() if err != nil { - logger.Error("reconciliation error when trying to get missingPvtDataTracker:", err) + r.logger.Error("reconciliation error when trying to get missingPvtDataTracker:", err) return err } if missingPvtDataTracker == nil { - logger.Error("got nil as MissingPvtDataTracker, exiting...") + r.logger.Error("got nil as MissingPvtDataTracker, exiting...") return errors.New("got nil as MissingPvtDataTracker, exiting...") } totalReconciled, minBlock, maxBlock := 0, uint64(math.MaxUint64), uint64(0) @@ -140,25 +144,25 @@ func (r *Reconciler) reconcile() error { for { missingPvtDataInfo, err := missingPvtDataTracker.GetMissingPvtDataInfoForMostRecentBlocks(r.ReconcileBatchSize) if err != nil { - logger.Error("reconciliation error when trying to get missing pvt data info recent blocks:", err) + r.logger.Error("reconciliation error when trying to get missing pvt data info recent blocks:", err) return err } // if missingPvtDataInfo is nil, len will return 0 if len(missingPvtDataInfo) == 0 { if totalReconciled > 0 { - logger.Infof("Reconciliation cycle finished successfully. reconciled %d private data keys from blocks range [%d - %d]", totalReconciled, minBlock, maxBlock) + r.logger.Infof("Reconciliation cycle finished successfully. reconciled %d private data keys from blocks range [%d - %d]", totalReconciled, minBlock, maxBlock) } else { - logger.Debug("Reconciliation cycle finished successfully. no items to reconcile") + r.logger.Debug("Reconciliation cycle finished successfully. no items to reconcile") } return nil } - logger.Debug("got from ledger", len(missingPvtDataInfo), "blocks with missing private data, trying to reconcile...") + r.logger.Debug("got from ledger", len(missingPvtDataInfo), "blocks with missing private data, trying to reconcile...") dig2collectionCfg, minB, maxB := r.getDig2CollectionConfig(missingPvtDataInfo) fetchedData, err := r.FetchReconciledItems(dig2collectionCfg) if err != nil { - logger.Error("reconciliation error when trying to fetch missing items from different peers:", err) + r.logger.Error("reconciliation error when trying to fetch missing items from different peers:", err) return err } @@ -211,7 +215,7 @@ func (r *Reconciler) getDig2CollectionConfig(missingPvtDataInfo ledger.MissingPv if _, exists := collectionConfigCache[collConfigKey]; !exists { collectionConfig, err := r.getMostRecentCollectionConfig(pvtDataInfo.Namespace, pvtDataInfo.Collection, blockNum) if err != nil { - logger.Debug(err) + r.logger.Debug(err) continue } collectionConfigCache[collConfigKey] = collectionConfig @@ -268,7 +272,7 @@ func (r *Reconciler) preparePvtDataToCommit(elements []*protosgossip.PvtDataElem } for seqInBlock, nsRWS := range rwSetKeys.bySeqsInBlock() { rwsets := nsRWS.toRWSet() - logger.Debugf("Preparing to commit [%d] private write set, missed from transaction index [%d] of block number [%d]", len(rwsets.NsPvtRwset), seqInBlock, blockNum) + r.logger.Debugf("Preparing to commit [%d] private write set, missed from transaction index [%d] of block number [%d]", len(rwsets.NsPvtRwset), seqInBlock, blockNum) blockPvtData.WriteSets[seqInBlock] = &ledger.TxPvtData{ SeqInBlock: seqInBlock, WriteSet: rwsets, @@ -282,7 +286,7 @@ func (r *Reconciler) preparePvtDataToCommit(elements []*protosgossip.PvtDataElem func (r *Reconciler) logMismatched(pvtdataMismatched []*ledger.PvtdataHashMismatch) { if len(pvtdataMismatched) > 0 { for _, hashMismatch := range pvtdataMismatched { - logger.Warningf("failed to reconcile pvtdata chaincode %s, collection %s, block num %d, tx num %d due to hash mismatch", + r.logger.Warningf("failed to reconcile pvtdata chaincode %s, collection %s, block num %d, tx num %d due to hash mismatch", hashMismatch.Namespace, hashMismatch.Collection, hashMismatch.BlockNum, hashMismatch.TxNum) } } diff --git a/gossip/privdata/reconcile_test.go b/gossip/privdata/reconcile_test.go index 6b6d1dba114..955a872893a 100644 --- a/gossip/privdata/reconcile_test.go +++ b/gossip/privdata/reconcile_test.go @@ -41,7 +41,8 @@ func TestNoItemsToReconcile(t *testing.T) { fetcher.On("FetchReconciledItems", mock.Anything).Return(nil, errors.New("this function shouldn't be called")) r := &Reconciler{ - channel: "", + channel: "mychannel", + logger: logger.With("channel", "mychannel"), metrics: metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics, ReconcileSleepInterval: time.Minute, ReconcileBatchSize: 1, @@ -82,7 +83,8 @@ func TestNotReconcilingWhenCollectionConfigNotAvailable(t *testing.T) { }).Return(nil, errors.New("called with no digests")) r := &Reconciler{ - channel: "", + channel: "mychannel", + logger: logger.With("channel", "mychannel"), metrics: metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics, ReconcileSleepInterval: time.Minute, ReconcileBatchSize: 1, @@ -188,6 +190,7 @@ func TestReconciliationHappyPathWithoutScheduler(t *testing.T) { r := &Reconciler{ channel: "mychannel", + logger: logger.With("channel", "mychannel"), metrics: metrics, ReconcileSleepInterval: time.Minute, ReconcileBatchSize: 1, @@ -486,7 +489,8 @@ func TestReconciliationFailedToCommit(t *testing.T) { committer.On("CommitPvtDataOfOldBlocks", mock.Anything, mock.Anything).Return(nil, errors.New("failed to commit")) r := &Reconciler{ - channel: "", + channel: "mychannel", + logger: logger.With("channel", "mychannel"), metrics: metrics.NewGossipMetrics(&disabled.Provider{}).PrivdataMetrics, ReconcileSleepInterval: time.Minute, ReconcileBatchSize: 1, diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 110ac4b00b3..d900d79e2a4 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -335,7 +335,7 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order servicesAdapter := &state.ServicesMediator{GossipAdapter: g, MCSAdapter: g.mcs} // Initialize private data fetcher - dataRetriever := gossipprivdata.NewDataRetriever(store, support.Committer) + dataRetriever := gossipprivdata.NewDataRetriever(channelID, store, support.Committer) collectionAccessFactory := gossipprivdata.NewCollectionAccessFactory(support.IdDeserializeFactory) fetcher := gossipprivdata.NewPuller(g.metrics.PrivdataMetrics, support.CollectionStore, g.gossipSvc, dataRetriever, collectionAccessFactory, channelID, g.serviceConfig.BtlPullMargin)