-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnode.go
329 lines (305 loc) · 7.26 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
package scr
import (
"fmt"
"math/rand"
)
const (
getPeerForMaxTries = 10
)
// AKA a floating castle in the sky
//
// There's a place in my mind
// No one knows where it hides
// And my fantasy is flying
// It's a castle in the sky
type Node struct {
Location V
S State
NextS State
CurrentBSize int
MaxBSize int
WaitActivity float64
// Data is the slice into the global slice, for rapid lookup. Presence
// of Data does not indicate memory ownership.
Data []*Data
// DataIndices are a slice of indices into the Data slice that this node
// owns. It is preallocated to a fixed size, indicating a numerical
// limit the node can hold. The content of the data it holds can be
// accessed by:
//
// Data[DataIndices[0:len(DataIndices)-1]]
//
// If the resulting value is nil, then it is empty
DataIndices []int
// This node's known peers
peers PeerList
// The f(X) value for this node (lower = closer to its data)
fx float64
// Sum sum of the square f(X) value (for std dev calculations)
fxsq float64
// Number of data pieces that go into the f(X) calculation
nfx int
}
// NewNodes begin at a random location if they have no data. Otherwise, they
// begin at a predetermined location based on the data they possess.
func NewNode(
dataCache []*Data,
myIdxs []int,
maxBSize int,
waitActivity float64,
peerList PeerList) *Node {
n := &Node{
S: State{id: StateJoin},
Data: dataCache,
DataIndices: myIdxs,
MaxBSize: maxBSize,
WaitActivity: waitActivity,
peers: peerList,
}
n.computeLocationAndCurrentSize()
return n
}
func (n *Node) getDataLocations() []V {
locs := make([]V, 0, len(n.DataIndices))
for _, idx := range n.DataIndices {
if n.Data[idx] == nil {
continue
}
locs = append(locs, n.Data[idx].Location)
}
return locs
}
func (n *Node) computeLocationAndCurrentSize() {
bsz := 0
locs := make([]V, 0, len(n.DataIndices))
weights := make([]float64, 0, len(n.DataIndices))
hasData := false
for _, idx := range n.DataIndices {
if n.Data[idx] == nil {
continue
}
hasData = true
locs = append(locs, n.Data[idx].Location)
weights = append(weights, 1)
bsz += n.Data[idx].DataSize
}
if hasData {
var err error
var fx float64
var fxsq float64
var nfx int
n.Location, fx, fxsq, nfx, err = SolveNonEuclideanMultifacilityLocationMonteCarlo(
locs,
weights,
0.1, 0.1,
2)
if err != nil {
// TODO: Yikes!
panic(err)
}
n.fx = fx
n.fxsq = fxsq
n.nfx = nfx
} else {
fmt.Printf("RandomVector location: nIdx=%v\n", len(n.DataIndices))
n.Location = RandomVector()
n.fx = 0
n.fxsq = 0
n.nfx = 0
}
n.CurrentBSize = bsz
}
func (n *Node) ifWaitOrJoin(f func()) bool {
if n.S.id == StateWait || n.S.id == StateJoin {
f()
return true
}
return false
}
func (n *Node) ifWaitOrJoinBool(f func() bool) (executed bool, val bool) {
if n.S.id == StateWait || n.S.id == StateJoin {
return true, f()
}
return false, false
}
func (n *Node) addPeer(o *Node) {
n.peers.AddPeer(n.Location, o)
}
func (n *Node) requestPeer() {
o := n.peers.GetRandomPeer()
if o == nil {
return
}
peer := o.getPeerFor(n)
if peer != nil {
// NODE INTERACTION: PEER HELLO
if o.ifWaitOrJoin(func() { o.addPeer(n) }) {
n.addPeer(peer)
}
}
}
func (n *Node) getPeerFor(o *Node) *Node {
return n.peers.GetRandomPeerThatsNot(o)
}
func (n *Node) exchangeData() (s string) {
o, dataIdx := n.peers.RandomlyFindPeerCloserToData(n.Location, n.Data, n.DataIndices)
if o == nil && dataIdx < 0 {
s = "could not exchange data (no peers)"
return
}
// dataIdx is data to transfer if >= 0; otherwise return
// if peer says "ok" then we forget our reference
if dataIdx < 0 {
s = "could not exchange data (no closer data)"
return
}
exec, ok := o.ifWaitOrJoinBool(func() bool { return o.exchangeDataReceive(n.Data[dataIdx]) })
if !exec {
s = fmt.Sprintf("unsuccessfully exchanged data at index %d to peer %v", dataIdx, o.Location)
return
}
if ok {
n.exchangeDataGive(dataIdx)
s = fmt.Sprintf("exchanged data at index %d to peer %v", dataIdx, o.Location)
}
// Exchange locations after data exchange
o.addPeer(n) // Not wrapped since exec succeeded before
n.addPeer(o)
return
}
func (n *Node) exchangeDataReceive(d *Data) bool {
// Too big of data
if d.DataSize+n.CurrentBSize > n.MaxBSize {
return false
}
availIdx := -1
for _, idx := range n.DataIndices {
if n.Data[idx] == nil {
availIdx = idx
break
}
}
// Too much qty of data
if availIdx < 0 {
return false
}
n.Data[availIdx] = d
n.computeLocationAndCurrentSize()
return true
}
func (n *Node) exchangeDataGive(idx int) {
n.Data[idx] = nil
n.computeLocationAndCurrentSize()
}
func (n *Node) nextFreeDataIndex() int {
availIdx := -1
for _, idx := range n.DataIndices {
if n.Data[idx] == nil {
availIdx = idx
break
}
}
return availIdx
}
func (n *Node) applyNewData(idx int) {
d := n.Data[idx]
// Too big of data -- remove it
if d.DataSize+n.CurrentBSize > n.MaxBSize {
n.Data[idx] = nil
}
n.computeLocationAndCurrentSize()
}
type coordinator interface {
FindOtherArbitraryNode(*Node) *Node
}
func (n *Node) ApplyState(c coordinator) string {
switch n.S.id {
case StateJoin:
o := c.FindOtherArbitraryNode(n)
s := "did not find other arbitrary node"
if o != nil {
// Exchange location information as well.
// May not be a mutual add.
//
// NODE INTERACTION: PEER HELLO
if o.ifWaitOrJoin(func() { o.addPeer(n) }) {
s = "found other arbitrary node"
n.addPeer(o)
}
}
n.NextS = State{
id: StateWait,
lastState: StateJoin,
}
return fmt.Sprintf("Node at %s joined and %s", n.Location, s)
case StateWait:
// Chance of the node spontaneously doing an action. We simply
// attempt to alternate through actions.
if rand.Float64() < n.WaitActivity {
if n.S.lastState == StateExchangeData {
n.NextS = State{
id: StateAskPeer,
lastState: StateWait,
}
return fmt.Sprintf("Node at %s will attempt asking peer", n.Location)
} else {
n.NextS = State{
id: StateExchangeData,
lastState: StateWait,
}
return fmt.Sprintf("Node at %s will attempt exchange", n.Location)
}
} else {
n.NextS = State{
id: StateWait,
lastState: StateWait,
}
return fmt.Sprintf("Node at %s waited", n.Location)
}
case StateExchangeData:
// Try exchanging data with a neighbor closer
// to that data's location
//
// NODE INTERACTION: EXCHANGE DATA
s := n.exchangeData()
n.NextS = State{
id: StateWait,
lastState: StateExchangeData,
}
return fmt.Sprintf("Node at %s %s", n.Location, s)
case StateAskPeer:
// Try asking for a peer
//
// NODE INTERACTION: REQUEST PEER
n.requestPeer()
n.NextS = State{
id: StateWait,
lastState: StateAskPeer,
}
return fmt.Sprintf("Node at %s asked peer", n.Location)
}
return fmt.Sprintf("Unknown action: %v", n.S)
}
func (n *Node) AdvanceState() {
n.S = n.NextS
}
func (n *Node) CountLastState(m []int) {
m[n.NextS.lastState] += 1
}
func (n *Node) PeerLocations() []V {
return n.peers.Locations()
}
// removePeer is used by Tocker via Simulation
func (n *Node) removePeer(o *Node) {
n.peers.RemovePeer(o)
}
type State struct {
id int
lastState int
}
const (
StateJoin int = iota
StateWait
StateExchangeData
StateAskPeer
)