Skip to content

Commit

Permalink
Merge pull request hyperledger#8 from andrew-coleman/orderer-main
Browse files Browse the repository at this point in the history
Fix build
  • Loading branch information
yacovm authored Jun 23, 2022
2 parents 0d4fcae + d910783 commit b9c4888
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 14 deletions.
12 changes: 10 additions & 2 deletions orderer/common/cluster/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/internal/pkg/identity"
Expand Down Expand Up @@ -83,6 +82,7 @@ func (s suspectSet) has(entry string) bool {
type groupCollection struct {
entries []EndpointCriteria
cursor int
lock sync.Mutex
}

// select_random_entries returns n entries from groupCollection
Expand All @@ -91,6 +91,9 @@ func (g *groupCollection) select_random_entries(n int) []EndpointCriteria {
// and not repeating
nbr := []EndpointCriteria{}

g.lock.Lock()
defer g.lock.Unlock()

for i := 0; i < n; i++ {
// select the next entry, circularly looping over the array
k := g.entries[(g.cursor+i)%len(g.entries)]
Expand All @@ -108,6 +111,7 @@ type AttestationPuller struct {
openConnections []attestationConnectionInfo
cancelStream func()
Logger *flogging.FabricLogger
lock sync.Mutex
}

func (p *AttestationPuller) seekNextEnvelope(startSeq uint64) (*common.Envelope, error) {
Expand All @@ -123,6 +127,8 @@ func (p *AttestationPuller) seekNextEnvelope(startSeq uint64) (*common.Envelope,
}

func (p *AttestationPuller) closeEndpoints() {
p.lock.Lock()
defer p.lock.Unlock()
// Close the streams and grpc connections
for _, oc := range p.openConnections {
oc.stream.cancelFunc()
Expand Down Expand Up @@ -156,7 +162,9 @@ func (p *AttestationPuller) pullAttestationBlock(ec EndpointCriteria, env *commo
// append the stream to bf.openConnections, so that Close can close all streams
// dial connections
cni := attestationConnectionInfo{stream: stream, conn: conn}
p.lock.Lock()
p.openConnections = append(p.openConnections, cni)
p.lock.Unlock()
p.Logger.Debugf("Add grpc conn to openConnections list.")
resp, err := stream.Recv()
if err != nil {
Expand Down Expand Up @@ -212,7 +220,7 @@ func (stream *impatientAttestationStream) abort() {

// Recv blocks until a response is received from the stream or the
// timeout expires.
func (stream *impatientAttestationStream) Recv() (*ab.BlockAttestationResponse, error) {
func (stream *impatientAttestationStream) Recv() (*orderer.BlockAttestationResponse, error) {
// Initialize a timeout to cancel the stream when it expires
timeout := time.NewTimer(stream.waitTimeout)
defer timeout.Stop()
Expand Down
10 changes: 10 additions & 0 deletions orderer/common/cluster/block_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"crypto/rand"
"encoding/hex"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -245,7 +246,10 @@ func TestBlockFetcherBFTBehaviorBlockWithhold(t *testing.T) {
}

attestation_source_created := false
lock := sync.Mutex{}
bf.AttestationSourceFactory = func(c cluster.FetcherConfig) cluster.AttestationSource {
lock.Lock()
defer lock.Unlock()
if !attestation_source_created {
// first attestation source created withholds attestation
attestation_source_created = true
Expand Down Expand Up @@ -453,7 +457,10 @@ func TestBlockFetcherBFTBehaviorPullAttestationError(t *testing.T) {
}

attestation_source_created := false
lock := sync.Mutex{}
bf.AttestationSourceFactory = func(c cluster.FetcherConfig) cluster.AttestationSource {
lock.Lock()
defer lock.Unlock()
if !attestation_source_created {
// first attestation source created returns error
attestation_source_created = true
Expand Down Expand Up @@ -535,7 +542,10 @@ func TestBlockFetcherBFTBehaviorAttestationsLessThanF(t *testing.T) {
}

attestation_source_created := false
lock := sync.Mutex{}
bf.AttestationSourceFactory = func(c cluster.FetcherConfig) cluster.AttestationSource {
lock.Lock()
defer lock.Unlock()
if !attestation_source_created {
// first attestation source created returnsattestation block
attestation_source_created = true
Expand Down
5 changes: 2 additions & 3 deletions orderer/common/server/attestationserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
cb "github.com/hyperledger/fabric-protos-go/common"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/policies"
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
Expand All @@ -31,14 +30,14 @@ type attestationserver struct {
// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
func NewAttestationService(
r *multichannel.Registrar,
metricsProvider metrics.Provider,
metrics *deliver.Metrics,
debug *localconfig.Debug,
timeWindow time.Duration,
mutualTLS bool,
expirationCheckDisabled bool,
) ab.BlockAttestationsServer {
s := &attestationserver{
dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS, deliver.NewMetrics(metricsProvider), expirationCheckDisabled),
dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS, metrics, expirationCheckDisabled),
debug: debug,
Registrar: r,
}
Expand Down
10 changes: 7 additions & 3 deletions orderer/common/server/attestationserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
cb "github.com/hyperledger/fabric-protos-go/common"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/bccsp/sw"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/ledger/blockledger/fileledger"
"github.com/hyperledger/fabric/common/metrics/disabled"
Expand Down Expand Up @@ -250,7 +251,8 @@ func Test_attestationserver_BlockAttestations(t *testing.T) {

mockstream := NewAttestationServerStream()

asr := NewAttestationService(manager, &disabled.Provider{}, nil, time.Second, false, false)
metrics := deliver.NewMetrics(&disabled.Provider{})
asr := NewAttestationService(manager, metrics, nil, time.Second, false, false)
err = asr.BlockAttestations(envelope, mockstream)
require.NoError(t, err)

Expand All @@ -264,7 +266,8 @@ func Test_attestationserver_BlockAttestations(t *testing.T) {

t.Run("Block attestation returns with BAD_REQUEST when envelope is malformed", func(t *testing.T) {
mockstream := NewAttestationServerStream()
asr := NewAttestationService(&multichannel.Registrar{}, &disabled.Provider{}, nil, time.Second, false, false)
metrics := deliver.NewMetrics(&disabled.Provider{})
asr := NewAttestationService(&multichannel.Registrar{}, metrics, nil, time.Second, false, false)
err := asr.BlockAttestations(&cb.Envelope{}, mockstream)
require.NoError(t, err)
StatusResponse, _ := mockstream.RecvResponse()
Expand All @@ -273,7 +276,8 @@ func Test_attestationserver_BlockAttestations(t *testing.T) {

t.Run("Block attestion returns with NOTFOUND when custom policy check fails", func(t *testing.T) {
mockstream := NewAttestationServerStream()
asr := NewAttestationService(&multichannel.Registrar{}, &disabled.Provider{}, nil, time.Second, false, false)
metrics := deliver.NewMetrics(&disabled.Provider{})
asr := NewAttestationService(&multichannel.Registrar{}, metrics, nil, time.Second, false, false)
err := asr.BlockAttestations(envelope, mockstream)
require.NoError(t, err)
StatusResponse, _ := mockstream.RecvResponse()
Expand Down
9 changes: 7 additions & 2 deletions orderer/common/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/fabhttp"
"github.com/hyperledger/fabric/common/flogging"
floggingmetrics "github.com/hyperledger/fabric/common/flogging/metrics"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/orderer/common/bootstrap/file"
"github.com/hyperledger/fabric/orderer/common/broadcast"
"github.com/hyperledger/fabric/orderer/common/channelparticipation"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
Expand Down Expand Up @@ -269,9 +271,12 @@ func Main() {
defer adminServer.Stop()

mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
deliverMetrics := deliver.NewMetrics(metricsProvider)
broadcastMetrics := broadcast.NewMetrics(metricsProvider)
server := NewServer(
manager,
metricsProvider,
deliverMetrics,
broadcastMetrics,
&conf.Debug,
conf.General.Authentication.TimeWindow,
mutualTLS,
Expand All @@ -280,7 +285,7 @@ func Main() {

attestationService := NewAttestationService(
manager,
metricsProvider,
deliverMetrics,
&conf.Debug,
conf.General.Authentication.TimeWindow,
mutualTLS,
Expand Down
8 changes: 4 additions & 4 deletions orderer/common/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
cb "github.com/hyperledger/fabric-protos-go/common"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/broadcast"
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
Expand Down Expand Up @@ -85,17 +84,18 @@ func (rs *responseSender) DataType() string {
// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
func NewServer(
r *multichannel.Registrar,
metricsProvider metrics.Provider,
deliverMetrics *deliver.Metrics,
broadcastMetrics *broadcast.Metrics,
debug *localconfig.Debug,
timeWindow time.Duration,
mutualTLS bool,
expirationCheckDisabled bool,
) ab.AtomicBroadcastServer {
s := &server{
dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS, deliver.NewMetrics(metricsProvider), expirationCheckDisabled),
dh: deliver.NewHandler(deliverSupport{Registrar: r}, timeWindow, mutualTLS, deliverMetrics, expirationCheckDisabled),
bh: &broadcast.Handler{
SupportRegistrar: broadcastSupport{Registrar: r},
Metrics: broadcast.NewMetrics(metricsProvider),
Metrics: broadcastMetrics,
},
debug: debug,
Registrar: r,
Expand Down

0 comments on commit b9c4888

Please sign in to comment.