Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 96a926d

Browse files
committed
[FAB-8782] Resolve linter in peer/ord/comm
This change resolve gometalinter warnings in: pkg/fab/peer pkg/fab/orderer pkg/fab/comm Change-Id: I390f5d22f0f78e2e0a9109e8796cef50ea36bdc0 Signed-off-by: Troy Ronda <troy@troyronda.com>
1 parent 81a872e commit 96a926d

File tree

9 files changed

+94
-101
lines changed

9 files changed

+94
-101
lines changed

pkg/fab/comm/connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvide
6363

6464
stream, err := streamProvider(grpcconn)
6565
if err != nil {
66-
if err := grpcconn.Close(); err != nil {
67-
logger.Warnf("error closing GRPC connection: %s", err)
66+
if closeErr := grpcconn.Close(); err != nil {
67+
logger.Warnf("error closing GRPC connection: %s", closeErr)
6868
}
6969
return nil, errors.Wrapf(err, "could not create stream to %s", url)
7070
}

pkg/fab/comm/connection_test.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"google.golang.org/grpc/keepalive"
1515

1616
fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context"
17-
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
1817
eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks"
1918
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
2019

@@ -37,11 +36,11 @@ func TestConnection(t *testing.T) {
3736
context := newMockContext()
3837
chConfig := fabmocks.NewMockChannelCfg(channelID)
3938

40-
conn, err := NewConnection(context, chConfig, testStream, "")
39+
_, err := NewConnection(context, chConfig, testStream, "")
4140
if err == nil {
4241
t.Fatalf("expected error creating new connection with empty URL")
4342
}
44-
conn, err = NewConnection(context, chConfig, testStream, "invalidhost:0000",
43+
_, err = NewConnection(context, chConfig, testStream, "invalidhost:0000",
4544
WithFailFast(true),
4645
WithCertificate(nil),
4746
WithHostOverride(""),
@@ -51,12 +50,12 @@ func TestConnection(t *testing.T) {
5150
if err == nil {
5251
t.Fatalf("expected error creating new connection with invalid URL")
5352
}
54-
conn, err = NewConnection(context, chConfig, invalidStream, peerURL)
53+
_, err = NewConnection(context, chConfig, invalidStream, peerURL)
5554
if err == nil {
5655
t.Fatalf("expected error creating new connection with invalid stream but got none")
5756
}
5857

59-
conn, err = NewConnection(context, chConfig, testStream, peerURL)
58+
conn, err := NewConnection(context, chConfig, testStream, peerURL)
6059
if err != nil {
6160
t.Fatalf("error creating new connection: %s", err)
6261
}
@@ -85,13 +84,6 @@ func TestConnection(t *testing.T) {
8584
var testServer *eventmocks.MockEventhubServer
8685
var endorserAddr []string
8786

88-
func newPeerConfig(peerURL string) *core.PeerConfig {
89-
return &core.PeerConfig{
90-
URL: peerURL,
91-
GRPCOptions: make(map[string]interface{}),
92-
}
93-
}
94-
9587
func newMockContext() fabcontext.Client {
9688
return fabmocks.NewMockContext(fabmocks.NewMockUser("test"))
9789
}

pkg/fab/comm/connector.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ func (cc *CachingConnector) removeConn(target string) {
230230
if ok {
231231
delete(cc.index, c.conn)
232232
cc.conns.Delete(target)
233-
c.conn.Close()
233+
if err := c.conn.Close(); err != nil {
234+
logger.Debugf("unable to close connection [%s]", err)
235+
}
234236
}
235237
}
236238
}
@@ -316,7 +318,11 @@ func cache(conns map[string]*cachedConn, updateConn *cachedConn) {
316318
logger.Debugf("new connection in connection janitor")
317319
} else if c.conn != updateConn.conn {
318320
logger.Debugf("connection change in connection janitor")
319-
c.conn.Close() // Not blocking
321+
322+
if err := c.conn.Close(); err != nil {
323+
logger.Debugf("unable to close connection [%s]", err)
324+
}
325+
320326
} else {
321327
logger.Debugf("updating existing connection in connection janitor")
322328
}
@@ -347,9 +353,13 @@ func sweep(conns map[string]*cachedConn, idleTime time.Duration) []string {
347353
}
348354

349355
func closeConn(conn *grpc.ClientConn) {
350-
conn.Close()
356+
if err := conn.Close(); err != nil {
357+
logger.Debugf("unable to close connection [%s]", err)
358+
}
351359

352360
ctx, cancel := context.WithTimeout(context.Background(), connShutdownTimeout)
353-
waitConn(ctx, conn, connectivity.Shutdown)
361+
if err := waitConn(ctx, conn, connectivity.Shutdown); err != nil {
362+
logger.Debugf("unable to wait for connection close [%s]", err)
363+
}
354364
cancel()
355365
}

pkg/fab/comm/connector_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,17 +195,15 @@ func testDial(t *testing.T, wg *sync.WaitGroup, connector *CachingConnector, add
195195
conn, err := connector.DialContext(ctx, addr, grpc.WithInsecure())
196196
cancel()
197197
assert.Nil(t, err, "DialContext should have succeeded")
198+
defer connector.ReleaseConn(conn)
198199

199200
endorserClient := pb.NewEndorserClient(conn)
200-
ctx, cancel = context.WithTimeout(context.Background(), normalTimeout)
201201
proposal := pb.SignedProposal{}
202202
resp, err := endorserClient.ProcessProposal(context.Background(), &proposal)
203-
cancel()
204203

205204
assert.Nil(t, err, "peer process proposal should not have error")
206205
assert.Equal(t, int32(200), resp.GetResponse().Status)
207206

208207
randomSleep := rand.Intn(maxSleepBeforeRelease)
209208
time.Sleep(time.Duration(minSleepBeforeRelease)*time.Millisecond + time.Duration(randomSleep)*time.Millisecond)
210-
connector.ReleaseConn(conn)
211209
}

pkg/fab/orderer/orderer.go

Lines changed: 67 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,16 @@ var logger = logging.NewLogger("fabsdk/fab")
3333

3434
// Orderer allows a client to broadcast a transaction.
3535
type Orderer struct {
36-
config core.Config
37-
url string
38-
tlsCACert *x509.Certificate
39-
serverName string
40-
grpcDialOption []grpc.DialOption
41-
kap keepalive.ClientParameters
42-
dialTimeout time.Duration
43-
failFast bool
44-
transportCredentials credentials.TransportCredentials
45-
allowInsecure bool
46-
commManager fab.CommManager
36+
config core.Config
37+
url string
38+
serverName string
39+
tlsCACert *x509.Certificate
40+
grpcDialOption []grpc.DialOption
41+
kap keepalive.ClientParameters
42+
dialTimeout time.Duration
43+
failFast bool
44+
allowInsecure bool
45+
commManager fab.CommManager
4746
}
4847

4948
// Option describes a functional parameter for the New constructor
@@ -245,52 +244,58 @@ func (o *Orderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnve
245244
}
246245
defer o.releaseConn(ctx, conn)
247246

248-
broadcastStream, err := ab.NewAtomicBroadcastClient(conn).Broadcast(ctx)
247+
broadcastClient, err := ab.NewAtomicBroadcastClient(conn).Broadcast(ctx)
249248
if err != nil {
250249
rpcStatus, ok := grpcstatus.FromError(err)
251250
if ok {
252251
err = status.NewFromGRPCStatus(rpcStatus)
253252
}
254253
return nil, errors.Wrap(err, "NewAtomicBroadcastClient failed")
255254
}
256-
done := make(chan bool)
257-
var broadcastErr error
258-
var broadcastStatus *common.Status
259255

260-
go func() {
261-
for {
262-
broadcastResponse, err := broadcastStream.Recv()
263-
logger.Debugf("Orderer.broadcastStream - response:%v, error:%v\n", broadcastResponse, err)
264-
if err != nil {
265-
rpcStatus, ok := grpcstatus.FromError(err)
266-
if ok {
267-
err = status.NewFromGRPCStatus(rpcStatus)
268-
}
269-
broadcastErr = errors.Wrap(err, "broadcast recv failed")
270-
done <- true
271-
return
272-
}
273-
broadcastStatus = &broadcastResponse.Status
274-
if broadcastResponse.Status == common.Status_SUCCESS {
275-
done <- true
276-
return
277-
}
278-
if broadcastResponse.Status != common.Status_SUCCESS {
279-
broadcastErr = status.New(status.OrdererServerStatus, int32(broadcastResponse.Status), broadcastResponse.Info, nil)
280-
done <- true
281-
return
282-
}
283-
}
284-
}()
285-
if err := broadcastStream.Send(&common.Envelope{
256+
responses := make(chan common.Status)
257+
errs := make(chan error, 1)
258+
259+
go broadcastStream(broadcastClient, responses, errs)
260+
261+
err = broadcastClient.Send(&common.Envelope{
286262
Payload: envelope.Payload,
287263
Signature: envelope.Signature,
288-
}); err != nil {
264+
})
265+
if err != nil {
289266
return nil, errors.Wrap(err, "failed to send envelope to orderer")
290267
}
291-
broadcastStream.CloseSend()
292-
<-done
293-
return broadcastStatus, broadcastErr
268+
if err = broadcastClient.CloseSend(); err != nil {
269+
logger.Debugf("unable to close broadcast client [%s]", err)
270+
}
271+
272+
select {
273+
case broadcastStatus := <-responses:
274+
return &broadcastStatus, nil
275+
case broadcastErr := <-errs:
276+
return nil, broadcastErr
277+
}
278+
}
279+
280+
func broadcastStream(broadcastClient ab.AtomicBroadcast_BroadcastClient, responses chan common.Status, errs chan error) {
281+
282+
broadcastResponse, err := broadcastClient.Recv()
283+
logger.Debugf("Orderer.broadcastStream - response:%v, error:%v", broadcastResponse, err)
284+
if err != nil {
285+
rpcStatus, ok := grpcstatus.FromError(err)
286+
if ok {
287+
err = status.NewFromGRPCStatus(rpcStatus)
288+
}
289+
errs <- errors.Wrap(err, "broadcast recv failed")
290+
return
291+
}
292+
293+
if broadcastResponse.Status != common.Status_SUCCESS {
294+
errs <- status.New(status.OrdererServerStatus, int32(broadcastResponse.Status), broadcastResponse.Info, nil)
295+
return
296+
}
297+
298+
responses <- broadcastResponse.Status
294299
}
295300

296301
// SendDeliver sends a deliver request to the ordering service and returns the
@@ -314,7 +319,7 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
314319
}
315320

316321
// Create atomic broadcast client
317-
broadcastStream, err := ab.NewAtomicBroadcastClient(conn).Deliver(ctx)
322+
broadcastClient, err := ab.NewAtomicBroadcastClient(conn).Deliver(ctx)
318323
if err != nil {
319324
logger.Errorf("deliver failed [%s]", err)
320325
o.releaseConn(ctx, conn)
@@ -325,29 +330,33 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
325330

326331
// Receive blocks from the GRPC stream and put them on the channel
327332
go func() {
328-
blockStream(broadcastStream, responses, errs)
333+
blockStream(broadcastClient, responses, errs)
329334
o.releaseConn(ctx, conn)
330335
}()
331336

332337
// Send block request envelope
333338
logger.Debugf("Requesting blocks from ordering service")
334-
if err := broadcastStream.Send(&common.Envelope{
339+
err = broadcastClient.Send(&common.Envelope{
335340
Payload: envelope.Payload,
336341
Signature: envelope.Signature,
337-
}); err != nil {
342+
})
343+
if err != nil {
338344
o.releaseConn(ctx, conn)
339345

340346
errs <- errors.Wrap(err, "failed to send block request to orderer")
341347
return responses, errs
342348
}
343-
broadcastStream.CloseSend()
349+
350+
if err = broadcastClient.CloseSend(); err != nil {
351+
logger.Debugf("unable to close deliver client [%s]", err)
352+
}
344353

345354
return responses, errs
346355
}
347356

348-
func blockStream(broadcastStream ab.AtomicBroadcast_DeliverClient, responses chan *common.Block, errs chan error) {
357+
func blockStream(deliverClient ab.AtomicBroadcast_DeliverClient, responses chan *common.Block, errs chan error) {
349358
for {
350-
response, err := broadcastStream.Recv()
359+
response, err := deliverClient.Recv()
351360
if err != nil {
352361
errs <- errors.Wrap(err, "recv from ordering service failed")
353362
return
@@ -357,11 +366,11 @@ func blockStream(broadcastStream ab.AtomicBroadcast_DeliverClient, responses cha
357366
// Seek operation success, no more resposes
358367
case *ab.DeliverResponse_Status:
359368
logger.Debugf("Received deliver response status from ordering service: %s", t.Status)
360-
if t.Status == common.Status_SUCCESS {
361-
close(responses)
369+
if t.Status != common.Status_SUCCESS {
370+
errs <- errors.Errorf("error status from ordering service %s", t.Status)
362371
return
363372
}
364-
errs <- errors.Errorf("error status from ordering service %s", t.Status)
373+
close(responses)
365374
return
366375

367376
// Response is a requested block
@@ -386,5 +395,7 @@ func (*defCommManager) DialContext(ctx reqContext.Context, target string, opts .
386395

387396
func (*defCommManager) ReleaseConn(conn *grpc.ClientConn) {
388397
logger.Debugf("ReleaseConn [%p]", conn)
389-
conn.Close()
398+
if err := conn.Close(); err != nil {
399+
logger.Debugf("unable to close connection [%s]", err)
400+
}
390401
}

pkg/fab/orderer/orderer_test.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,6 @@ import (
3434

3535
var testOrdererURL = "127.0.0.1:0"
3636

37-
var validRootCA = `-----BEGIN CERTIFICATE-----
38-
MIICYjCCAgmgAwIBAgIUB3CTDOU47sUC5K4kn/Caqnh114YwCgYIKoZIzj0EAwIw
39-
fzELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExFjAUBgNVBAcTDVNh
40-
biBGcmFuY2lzY28xHzAdBgNVBAoTFkludGVybmV0IFdpZGdldHMsIEluYy4xDDAK
41-
BgNVBAsTA1dXVzEUMBIGA1UEAxMLZXhhbXBsZS5jb20wHhcNMTYxMDEyMTkzMTAw
42-
WhcNMjExMDExMTkzMTAwWjB/MQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZv
43-
cm5pYTEWMBQGA1UEBxMNU2FuIEZyYW5jaXNjbzEfMB0GA1UEChMWSW50ZXJuZXQg
44-
V2lkZ2V0cywgSW5jLjEMMAoGA1UECxMDV1dXMRQwEgYDVQQDEwtleGFtcGxlLmNv
45-
bTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABKIH5b2JaSmqiQXHyqC+cmknICcF
46-
i5AddVjsQizDV6uZ4v6s+PWiJyzfA/rTtMvYAPq/yeEHpBUB1j053mxnpMujYzBh
47-
MA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBQXZ0I9
48-
qp6CP8TFHZ9bw5nRtZxIEDAfBgNVHSMEGDAWgBQXZ0I9qp6CP8TFHZ9bw5nRtZxI
49-
EDAKBggqhkjOPQQDAgNHADBEAiAHp5Rbp9Em1G/UmKn8WsCbqDfWecVbZPQj3RK4
50-
oG5kQQIgQAe4OOKYhJdh3f7URaKfGTf492/nmRmtK+ySKjpHSrU=
51-
-----END CERTIFICATE-----`
52-
5337
var ordererAddr string
5438
var ordererMockSrv *mocks.MockBroadcastServer
5539

pkg/fab/peer/peer.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package peer
88

99
import (
1010
reqContext "context"
11-
"fmt"
1211

1312
"crypto/x509"
1413

@@ -48,7 +47,6 @@ func New(config core.Config, opts ...Option) (*Peer, error) {
4847
config: config,
4948
commManager: &defCommManager{},
5049
}
51-
var err error
5250

5351
for _, opt := range opts {
5452
err := opt(peer)
@@ -70,11 +68,12 @@ func New(config core.Config, opts ...Option) (*Peer, error) {
7068
allowInsecure: peer.inSecure,
7169
commManager: peer.commManager,
7270
}
73-
peer.processor, err = newPeerEndorser(&endorseRequest)
71+
processor, err := newPeerEndorser(&endorseRequest)
7472

7573
if err != nil {
7674
return nil, err
7775
}
76+
peer.processor = processor
7877
}
7978

8079
return peer, nil
@@ -220,7 +219,7 @@ func (p *Peer) ProcessTransactionProposal(ctx reqContext.Context, proposal fab.P
220219
}
221220

222221
func (p *Peer) String() string {
223-
return fmt.Sprintf("%s", p.url)
222+
return p.url
224223
}
225224

226225
// PeersToTxnProcessors converts a slice of Peers to a slice of TxnProposalProcessors
@@ -243,5 +242,7 @@ func (*defCommManager) DialContext(ctx reqContext.Context, target string, opts .
243242

244243
func (*defCommManager) ReleaseConn(conn *grpc.ClientConn) {
245244
logger.Debugf("ReleaseConn [%p]", conn)
246-
conn.Close()
245+
if err := conn.Close(); err != nil {
246+
logger.Debugf("unable to close connection [%s]", err)
247+
}
247248
}

pkg/fab/peer/peerendorser.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ type peerEndorser struct {
3131
grpcDialOption []grpc.DialOption
3232
target string
3333
dialTimeout time.Duration
34-
failFast bool
3534
commManager fab.CommManager
3635
}
3736

0 commit comments

Comments
 (0)