Skip to content

Commit

Permalink
[FAB-12890] golint warnings in orderer/common/cluster
Browse files Browse the repository at this point in the history
This change set addresses golint warnings.

Change-Id: I5af4289ad669a829dd01587a85eeaef4146386db
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Dec 4, 2018
1 parent 1aa5b47 commit f4b1a7e
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 37 deletions.
29 changes: 15 additions & 14 deletions orderer/common/cluster/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type BlockPuller struct {
VerifyBlockSequence BlockSequenceVerifier
Endpoints []string
// Internal state
stream *impatientStream
stream *ImpatientStream
blockBuff []*common.Block
latestSeq uint64
endpoint string
Expand Down Expand Up @@ -182,8 +182,8 @@ func (p *BlockPuller) pullBlocks(seq uint64, reConnected bool) error {
return nil
}

func (p *BlockPuller) obtainStream(reConnected bool, env *common.Envelope, seq uint64) (*impatientStream, error) {
var stream *impatientStream
func (p *BlockPuller) obtainStream(reConnected bool, env *common.Envelope, seq uint64) (*ImpatientStream, error) {
var stream *ImpatientStream
var err error
if reConnected {
p.Logger.Infof("Sending request for block %d to %s", seq, p.endpoint)
Expand Down Expand Up @@ -347,10 +347,10 @@ func (p *BlockPuller) fetchLastBlockSeq(minRequestedSequence uint64, endpoint st
return block.Header.Number, nil
}

// requestBlocks starts requesting blocks from the given endpoint, using the given streamCreator by sending
// requestBlocks starts requesting blocks from the given endpoint, using the given ImpatientStreamCreator by sending
// the given envelope.
// It returns a stream that is used to pull blocks, or error if something goes wrong.
func (p *BlockPuller) requestBlocks(endpoint string, newStream streamCreator, env *common.Envelope) (*impatientStream, error) {
func (p *BlockPuller) requestBlocks(endpoint string, newStream ImpatientStreamCreator, env *common.Envelope) (*ImpatientStream, error) {
stream, err := newStream()
if err != nil {
p.Logger.Warningf("Failed establishing deliver stream with %s", endpoint)
Expand Down Expand Up @@ -455,22 +455,23 @@ func (eib endpointInfoBucket) byEndpoints() map[string]*endpointInfo {
return infoByEndpoints
}

type streamCreator func() (*impatientStream, error)
// ImpatientStreamCreator creates an ImpatientStream
type ImpatientStreamCreator func() (*ImpatientStream, error)

// impatientStream aborts the stream if it waits for too long for a message.
type impatientStream struct {
// ImpatientStream aborts the stream if it waits for too long for a message.
type ImpatientStream struct {
waitTimeout time.Duration
orderer.AtomicBroadcast_DeliverClient
cancelFunc func()
}

func (stream *impatientStream) abort() {
func (stream *ImpatientStream) abort() {
stream.cancelFunc()
}

// Recv blocks until a response is received from the stream or the
// timeout expires.
func (stream *impatientStream) Recv() (*orderer.DeliverResponse, error) {
func (stream *ImpatientStream) Recv() (*orderer.DeliverResponse, error) {
// Initialize a timeout to cancel the stream when it expires
timeout := time.NewTimer(stream.waitTimeout)
defer timeout.Stop()
Expand Down Expand Up @@ -498,9 +499,9 @@ func (stream *impatientStream) Recv() (*orderer.DeliverResponse, error) {
}
}

// NewImpatientStream returns a streamCreator that creates impatientStreams.
func NewImpatientStream(conn *grpc.ClientConn, waitTimeout time.Duration) streamCreator {
return func() (*impatientStream, error) {
// NewImpatientStream returns a ImpatientStreamCreator that creates impatientStreams.
func NewImpatientStream(conn *grpc.ClientConn, waitTimeout time.Duration) ImpatientStreamCreator {
return func() (*ImpatientStream, error) {
abc := orderer.NewAtomicBroadcastClient(conn)
ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -511,7 +512,7 @@ func NewImpatientStream(conn *grpc.ClientConn, waitTimeout time.Duration) stream
}

once := &sync.Once{}
return &impatientStream{
return &ImpatientStream{
waitTimeout: waitTimeout,
// The stream might be canceled while Close() is being called, but also
// while a timeout expires, so ensure it's only called once.
Expand Down
6 changes: 3 additions & 3 deletions orderer/common/cluster/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (r *Replicator) channelsToPull(channels []string) []string {
puller.Close()
// Restore the previous buffer size
puller.MaxTotalBufferBytes = bufferSize
if err == NotInChannelError {
if err == ErrNotInChannel {
r.Logger.Info("I do not belong to channel", channel, ", skipping chain retrieval")
continue
}
Expand Down Expand Up @@ -305,8 +305,8 @@ type ChainInspector struct {
LastConfigBlock *common.Block
}

// NotInChannelError denotes that an ordering node is not in the channel
var NotInChannelError = errors.New("not in the channel")
// ErrNotInChannel denotes that an ordering node is not in the channel
var ErrNotInChannel = errors.New("not in the channel")

// selfMembershipPredicate determines whether the caller is found in the given config block
type selfMembershipPredicate func(configBlock *common.Block) error
Expand Down
4 changes: 2 additions & 2 deletions orderer/common/cluster/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestReplicateChainsGreenPath(t *testing.T) {
// For channel A
amIPartOfChannelMock.On("func2").Return(nil).Once()
// For channel B
amIPartOfChannelMock.On("func2").Return(cluster.NotInChannelError).Once()
amIPartOfChannelMock.On("func2").Return(cluster.ErrNotInChannel).Once()

// 22 is for the system channel, and 31 is for channel A
blocksCommittedToLedger := make(chan *common.Block, 22+31)
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestParticipant(t *testing.T) {
},
latestConfigBlockSeq: 42,
latestConfigBlock: &common.Block{Header: &common.BlockHeader{Number: 42}},
predicateReturns: cluster.NotInChannelError,
predicateReturns: cluster.ErrNotInChannel,
},
} {
t.Run(testCase.name, func(t *testing.T) {
Expand Down
33 changes: 17 additions & 16 deletions orderer/common/cluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,23 @@ func (dialer *PredicateDialer) ClientConfig() (comm.ClientConfig, error) {
if val == nil {
return comm.ClientConfig{}, errors.New("client config not initialized")
}
if cc, isClientConfig := val.(comm.ClientConfig); !isClientConfig {
cc, isClientConfig := val.(comm.ClientConfig)
if !isClientConfig {
err := errors.Errorf("value stored is %v, not comm.ClientConfig",
reflect.TypeOf(val))
return comm.ClientConfig{}, err
} else {
if cc.SecOpts == nil {
return comm.ClientConfig{}, errors.New("SecOpts is nil")
}
// Copy by value the secure options
secOpts := *cc.SecOpts
return comm.ClientConfig{
AsyncConnect: cc.AsyncConnect,
Timeout: cc.Timeout,
SecOpts: &secOpts,
KaOpts: cc.KaOpts,
}, nil
}
if cc.SecOpts == nil {
return comm.ClientConfig{}, errors.New("SecOpts is nil")
}
// Copy by value the secure options
secOpts := *cc.SecOpts
return comm.ClientConfig{
AsyncConnect: cc.AsyncConnect,
Timeout: cc.Timeout,
SecOpts: &secOpts,
KaOpts: cc.KaOpts,
}, nil
}

// SetConfig sets the configuration of the PredicateDialer
Expand Down Expand Up @@ -182,6 +182,7 @@ type StandardDialer struct {
Dialer *PredicateDialer
}

// Dial dials to the given address
func (bdp *StandardDialer) Dial(address string) (*grpc.ClientConn, error) {
return bdp.Dialer.Dial(address, nil)
}
Expand Down Expand Up @@ -229,7 +230,7 @@ func VerifyBlocks(blockBuff []*common.Block, signatureVerifier BlockVerifier) er
// during iteration over the block batch.
for _, block := range blockBuff {
configFromBlock, err := ConfigFromBlock(block)
if err == notAConfig {
if err == errNotAConfig {
continue
}
if err != nil {
Expand All @@ -247,7 +248,7 @@ func VerifyBlocks(blockBuff []*common.Block, signatureVerifier BlockVerifier) er
return VerifyBlockSignature(lastBlock, signatureVerifier, config)
}

var notAConfig = errors.New("not a config block")
var errNotAConfig = errors.New("not a config block")

// ConfigFromBlock returns a ConfigEnvelope if exists, or a *NotAConfigBlock error.
// It may also return some other error in case parsing failed.
Expand All @@ -272,7 +273,7 @@ func ConfigFromBlock(block *common.Block) (*common.ConfigEnvelope, error) {
return nil, errors.WithStack(err)
}
if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG {
return nil, notAConfig
return nil, errNotAConfig
}
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion orderer/consensus/etcdraft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,5 +371,5 @@ func (conCert ConsenterCertificate) IsConsenterOfChannel(configBlock *common.Blo
return nil
}
}
return cluster.NotInChannelError
return cluster.ErrNotInChannel
}
2 changes: 1 addition & 1 deletion orderer/consensus/etcdraft/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestIsConsenterOfChannel(t *testing.T) {
name: "valid config block with cert mismatch",
configBlock: validBlock(),
certificate: certInsideConfigBlock[2:],
expectedError: cluster.NotInChannelError.Error(),
expectedError: cluster.ErrNotInChannel.Error(),
},
{
name: "valid config block with matching cert",
Expand Down

0 comments on commit f4b1a7e

Please sign in to comment.