Skip to content

Commit 9c38c5c

Browse files
authored
[loadgen] Close broadcast streams only after the workload has completed (#136)
#### Type of change - Bug fix #### Description - Fix bug in load-geneator: Close broadcast streams only after the workload has completed. This ensures that any pending messages in the stream’s internal queue are fully processed. Even if all transactions have already been submitted, additional time may be required for them to be transmitted. - Use a connection cache when opening all the connections in the connection manager to avoid re-opening the same connection multiple times. - Fix broken rebase from prior commit. #### Related issues - resolves #117 Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent 19bc714 commit 9c38c5c

File tree

11 files changed

+67
-38
lines changed

11 files changed

+67
-38
lines changed

integration/runner/runtime.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,14 +243,14 @@ func NewRuntime(t *testing.T, conf *Config) *CommitterRuntime {
243243
ConsensusType: ordererconn.Bft,
244244
})
245245
require.NoError(t, err)
246-
t.Cleanup(c.ordererStream.Close)
246+
t.Cleanup(c.ordererStream.CloseConnections)
247247

248248
c.sidecarClient, err = sidecarclient.New(&sidecarclient.Parameters{
249249
ChannelID: s.Policy.ChannelID,
250250
Client: test.NewTLSClientConfig(s.ClientTLS, s.Endpoints.Sidecar.Server),
251251
})
252252
require.NoError(t, err)
253-
t.Cleanup(c.sidecarClient.Close)
253+
t.Cleanup(c.sidecarClient.CloseConnections)
254254
return c
255255
}
256256

loadgen/adapters/orderer.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"golang.org/x/sync/errgroup"
1414

1515
"github.com/hyperledger/fabric-x-committer/loadgen/workload"
16+
"github.com/hyperledger/fabric-x-committer/utils/connection"
1617
"github.com/hyperledger/fabric-x-committer/utils/deliver"
1718
"github.com/hyperledger/fabric-x-committer/utils/test"
1819
)
@@ -39,7 +40,7 @@ func (c *OrdererAdapter) RunWorkload(ctx context.Context, txStream *workload.Str
3940
if err != nil {
4041
return errors.Wrap(err, "failed to create orderer clients")
4142
}
42-
defer client.Close()
43+
defer client.CloseConnections()
4344

4445
dCtx, dCancel := context.WithCancel(ctx)
4546
defer dCancel()
@@ -61,13 +62,19 @@ func (c *OrdererAdapter) RunWorkload(ctx context.Context, txStream *workload.Str
6162
})
6263
}
6364

64-
for range c.config.BroadcastParallelism {
65+
streams := make([]*test.BroadcastStream, c.config.BroadcastParallelism)
66+
for i := range streams {
67+
streams[i], err = test.NewBroadcastStream(gCtx, &c.config.Orderer)
68+
if err != nil {
69+
connection.CloseConnectionsLog(streams[:i]...)
70+
return errors.Wrap(err, "failed to create a broadcast stream")
71+
}
72+
}
73+
defer connection.CloseConnectionsLog(streams...)
74+
75+
for _, stream := range streams {
76+
stream := stream
6577
g.Go(func() error {
66-
stream, streamErr := test.NewBroadcastStream(gCtx, &c.config.Orderer)
67-
if streamErr != nil {
68-
return errors.Wrap(streamErr, "failed to create a broadcast stream")
69-
}
70-
defer stream.Close()
7178
return sendBlocks(gCtx, &c.commonAdapter, txStream, workload.MapToEnvelopeBatch, stream.SendBatch)
7279
})
7380
}

loadgen/adapters/sigverifier.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func (c *SvAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWi
6969
}
7070

7171
for _, stream := range streams {
72+
stream := stream
7273
g.Go(func() error {
7374
return sendBlocks(gCtx, &c.commonAdapter, txStream, workload.MapToVerifierBatch, stream.Send)
7475
})

loadgen/adapters/vcservice.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func (c *VcAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWi
7878
defer dCancel()
7979
g, gCtx := errgroup.WithContext(dCtx)
8080
for _, stream := range streams {
81+
stream := stream
8182
g.Go(func() error {
8283
return sendBlocks(ctx, &c.commonAdapter, txStream, workload.MapToVcBatch, stream.Send)
8384
})

loadgen/client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ func TestLoadGenForOnlyOrderer(t *testing.T) {
302302
Endpoints: endpoints,
303303
},
304304
ChannelID: clientConf.LoadProfile.Transaction.Policy.ChannelID,
305+
Identity: clientConf.LoadProfile.Transaction.Policy.Identity,
305306
ConsensusType: ordererconn.Bft,
306307
},
307308
BroadcastParallelism: 5,

service/sidecar/sidecarclient/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ func New(config *Parameters) (*Client, error) {
7373
}, nil
7474
}
7575

76-
// Close closes all the connections.
77-
func (c *Client) Close() {
78-
c.ConnectionManager.Close()
76+
// CloseConnections closes all the connections.
77+
func (c *Client) CloseConnections() {
78+
c.ConnectionManager.CloseConnections()
7979
}
8080

8181
// Deliver start receiving blocks starting from config.StartBlkNum to config.OutputBlock.

utils/connection/client_util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func Connect(config *DialConfig) (*grpc.ClientConn, error) {
201201

202202
// OpenConnections opens connections with multiple remotes.
203203
func OpenConnections(config MultiClientConfig) ([]*grpc.ClientConn, error) {
204-
logger.Infof("Opening connections to %d endpoints: %v.\n", len(config.Endpoints), config.Endpoints)
204+
logger.Infof("Opening connections to %d endpoints: %v.", len(config.Endpoints), config.Endpoints)
205205
dialConfigs, err := NewDialConfigPerEndpoint(&config)
206206
if err != nil {
207207
return nil, errors.Wrapf(err, "error while creating dial configs")

utils/deliver/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ func New(config *ordererconn.Config) (*Client, error) {
5252
}, nil
5353
}
5454

55-
// Close closes all the connections for the client.
56-
func (s *Client) Close() {
57-
s.connectionManager.Close()
55+
// CloseConnections closes all the connections for the client.
56+
func (s *Client) CloseConnections() {
57+
s.connectionManager.CloseConnections()
5858
}
5959

6060
// UpdateConnections updates the connection config.

utils/deliver/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func TestBroadcastDeliver(t *testing.T) {
4848
conf.Connection.Endpoints = allEndpoints[:6]
4949
client, err := New(&conf)
5050
require.NoError(t, err)
51-
t.Cleanup(client.Close)
51+
t.Cleanup(client.CloseConnections)
5252

5353
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
5454
t.Cleanup(cancel)
@@ -152,7 +152,7 @@ func submit(
152152
defer cancel()
153153
stream, err := test.NewBroadcastStream(ctx, conf)
154154
require.NoError(t, err)
155-
defer stream.Close()
155+
defer stream.CloseConnections()
156156

157157
err = stream.SendBatch(workload.MapToEnvelopeBatch(0, []*protoloadgen.TX{tx}))
158158
if err != nil {
@@ -233,7 +233,7 @@ func waitUntilGrpcServerIsReady(ctx context.Context, t *testing.T, endpoint *con
233233

234234
func waitUntilGrpcServerIsDown(ctx context.Context, t *testing.T, endpoint *connection.Endpoint) {
235235
t.Helper()
236-
newConn, err := connection.Connect(connection.NewInsecureDialConfig(endpoint))
236+
newConn, err := connection.Connect(test.NewInsecureDialConfig(endpoint))
237237
require.NoError(t, err)
238238
defer connection.CloseConnectionsLog(newConn)
239239
test.WaitUntilGrpcServerIsDown(ctx, t, newConn)

utils/ordererconn/conn_manager.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"math"
1313
"math/rand"
1414
"slices"
15+
"strings"
1516
"sync"
1617
"sync/atomic"
1718

@@ -110,17 +111,26 @@ func (c *ConnectionManager) Update(config *ConnectionConfig) error {
110111

111112
// We pre create all the connections to ensure correct form.
112113
connections := make(map[string]*grpc.ClientConn)
113-
allAPis := []string{anyAPI, Broadcast, Deliver}
114+
// We use a connection cache to avoid opening the same connection multiple times.
115+
connCache := make(map[string]*grpc.ClientConn)
116+
allAPIs := []string{anyAPI, Broadcast, Deliver}
114117
for _, id := range append(getAllIDs(config.Endpoints), anyID) {
115-
for _, api := range allAPis {
118+
for _, api := range allAPIs {
116119
filter := aggregateFilter(WithAPI(api), WithID(id))
117-
conn, err := openConnection(config, filter)
118-
if errors.Is(err, ErrNoConnections) {
120+
endpoints := filterOrdererEndpoints(config.Endpoints, filter)
121+
if len(endpoints) == 0 {
119122
continue
120123
}
121-
if err != nil {
122-
closeConnection(connections)
123-
return err
124+
endpointsKey := makeEndpointsKey(endpoints)
125+
conn, connInCache := connCache[endpointsKey]
126+
if !connInCache {
127+
var err error
128+
conn, err = openConnection(config, endpoints)
129+
if err != nil {
130+
closeConnection(connections)
131+
return err
132+
}
133+
connCache[endpointsKey] = conn
124134
}
125135
connections[filterKey(filter)] = conn
126136
}
@@ -179,14 +189,8 @@ func getAllIDs(endpoints []*Endpoint) []uint32 {
179189

180190
func openConnection(
181191
conf *ConnectionConfig,
182-
filter ...ConnFilter,
192+
endpoints []*connection.Endpoint,
183193
) (*grpc.ClientConn, error) {
184-
key := aggregateFilter(filter...)
185-
186-
endpoints := filterOrdererEndpoints(conf.Endpoints, key)
187-
if len(endpoints) == 0 {
188-
return nil, ErrNoConnections
189-
}
190194
// We shuffle the endpoints for load balancing.
191195
shuffle(endpoints)
192196
logger.Infof("Opening connections to %d endpoints: %v.", len(endpoints), endpoints)
@@ -201,8 +205,17 @@ func openConnection(
201205
return connection.Connect(dialConfig)
202206
}
203207

204-
// Close closes all the connections.
205-
func (c *ConnectionManager) Close() {
208+
func makeEndpointsKey(endpoint []*connection.Endpoint) string {
209+
addresses := make([]string, len(endpoint))
210+
for i, e := range endpoint {
211+
addresses[i] = e.Address()
212+
}
213+
slices.Sort(addresses)
214+
return strings.Join(addresses, ";")
215+
}
216+
217+
// CloseConnections closes all the connections.
218+
func (c *ConnectionManager) CloseConnections() {
206219
c.lock.Lock()
207220
defer c.lock.Unlock()
208221
closeConnection(c.connections)

0 commit comments

Comments
 (0)