Skip to content

Commit 5ed3822

Browse files
author
Flavio Crisciani
committed
Optimize networkDB queue
Added some optimizations to reduce the messages in the queue: 1) on join network the node execute a tcp sync with all the nodes that it is aware part of the specific network. During this time before the node was redistributing all the entries. This meant that if the network had 10K entries the queue of the joining node will jump to 10K. The fix adds a flag on the network that would avoid to insert any entry in the queue till the sync happens. Note that right now the flag is set in a best effort way, there is no real check if at least one of the nodes succeed. 2) limit the number of messages to redistribute coming from a TCP sync. Introduced a threshold that limit the number of messages that are propagated, this will disable this optimization in case of heavy load. Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
1 parent dd5a9ea commit 5ed3822

File tree

4 files changed

+51
-17
lines changed

4 files changed

+51
-17
lines changed

networkdb/broadcast.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ type tableEventMessage struct {
110110
tname string
111111
key string
112112
msg []byte
113-
node string
114113
}
115114

116115
func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
@@ -168,7 +167,6 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
168167
id: nid,
169168
tname: tname,
170169
key: key,
171-
node: nDB.config.NodeID,
172170
})
173171
return nil
174172
}

networkdb/cluster.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ const (
2424
retryInterval = 1 * time.Second
2525
nodeReapInterval = 24 * time.Hour
2626
nodeReapPeriod = 2 * time.Hour
27+
// considering a cluster with > 20 nodes and a drain speed of 100 msg/s
28+
// the following is roughly 1 minute
29+
maxQueueLenBroadcastOnSync = 500
2730
)
2831

2932
type logWriter struct{}
@@ -572,28 +575,33 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
572575

573576
var err error
574577
var networks []string
578+
var success bool
575579
for _, node := range nodes {
576580
if node == nDB.config.NodeID {
577581
continue
578582
}
579583
logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
580584
networks = nDB.findCommonNetworks(node)
581585
err = nDB.bulkSyncNode(networks, node, true)
582-
// if its periodic bulksync stop after the first successful sync
583-
if !all && err == nil {
584-
break
585-
}
586586
if err != nil {
587587
err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
588588
logrus.Warn(err.Error())
589+
} else {
590+
// bulk sync succeeded
591+
success = true
592+
// if its periodic bulksync stop after the first successful sync
593+
if !all {
594+
break
595+
}
589596
}
590597
}
591598

592-
if err != nil {
593-
return nil, err
599+
if success {
600+
// if at least one node sync succeeded
601+
return networks, nil
594602
}
595603

596-
return networks, nil
604+
return nil, err
597605
}
598606

599607
// Bulk sync all the table entries belonging to a set of networks to a

networkdb/delegate.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
142142
return true
143143
}
144144

145-
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
145+
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool {
146146
// Update our local clock if the received messages has newer time.
147147
nDB.tableClock.Witness(tEvent.LTime)
148148

@@ -175,6 +175,14 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
175175
nDB.Unlock()
176176
return false
177177
}
178+
} else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
179+
nDB.Unlock()
180+
// We don't know the entry, the entry is being deleted and the message is an async message
181+
// In this case the safest approach is to ignore it, it is possible that the queue grew so much to
182+
// exceed the garbage collection time (the residual reap time that is in the message is not being
183+
// updated, to avoid inserting too many messages in the queue).
184+
// Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
185+
return false
178186
}
179187

180188
e = &entry{
@@ -197,11 +205,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
197205
nDB.Unlock()
198206

199207
if err != nil && tEvent.Type == TableEventTypeDelete {
200-
// If it is a delete event and we did not have a state for it, don't propagate to the application
208+
// Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
209+
// We had saved the state so to speed up convergence and be able to avoid accepting create events.
210+
// Now we will rebroadcast the message if 2 conditions are met:
211+
// 1) we had already synced this network (during the network join)
212+
// 2) the residual reapTime is higher than 1/6 of the total reapTime.
201213
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
202-
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
203-
// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
204-
return e.reapTime > nDB.config.reapEntryInterval/6
214+
// most likely the cluster is already aware of it
215+
// This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around
216+
// forever
217+
//logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
218+
return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
205219
}
206220

207221
var op opType
@@ -215,7 +229,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
215229
}
216230

217231
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
218-
return true
232+
return network.inSync
219233
}
220234

221235
func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
@@ -244,7 +258,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
244258
return
245259
}
246260

247-
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
261+
if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast {
248262
var err error
249263
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
250264
if err != nil {
@@ -261,12 +275,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
261275
return
262276
}
263277

278+
// if the queue is over the threshold, avoid distributing information coming from TCP sync
279+
if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
280+
return
281+
}
282+
264283
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
265284
msg: buf,
266285
id: tEvent.NetworkID,
267286
tname: tEvent.TableName,
268287
key: tEvent.Key,
269-
node: tEvent.NodeName,
270288
})
271289
}
272290
}

networkdb/networkdb.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ type network struct {
130130
// Lamport time for the latest state of the entry.
131131
ltime serf.LamportTime
132132

133+
// Gets set to true after the first bulk sync happens
134+
inSync bool
135+
133136
// Node leave is in progress.
134137
leaving bool
135138

@@ -616,6 +619,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
616619
}
617620
nDB.addNetworkNode(nid, nDB.config.NodeID)
618621
networkNodes := nDB.networkNodes[nid]
622+
n = nodeNetworks[nid]
619623
nDB.Unlock()
620624

621625
if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
@@ -627,6 +631,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
627631
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
628632
}
629633

634+
// Mark the network as being synced
635+
// note this is a best effort, we are not checking the result of the bulk sync
636+
nDB.Lock()
637+
n.inSync = true
638+
nDB.Unlock()
639+
630640
return nil
631641
}
632642

0 commit comments

Comments
 (0)