-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.sh
executable file
·138 lines (119 loc) · 3.54 KB
/
test.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env bash
## author: A. Krupicka (2015)
## You should make sure that the time is as synchronized as possible on the 'source' and 'target' machines
## It might even make sense to use the same node as source and target to avoid this completely
# Configuration variables.
source=l5
target=l5
target_port=5678
cluster=(virtual-228 virtual-229 virtual-230 virtual-231)
cluster_port=5600
local_tmpfile=/tmp/ebalancer_streamdata
target_outfile=/tmp/ebalancer_out
# Called with the data file to stream as the argument.
go ()
{
read len _ <<< $(wc -l $1)
cluster_len=${#cluster[@]}
remote_datafile=/tmp/${1}
ssh $source "$(typeset -f); check_datafile $remote_datafile"
has_data=$?
if [ $has_data -ne 0 ]; then
echo "deploying stream data file... "
scp $1 $source:$remote_datafile
echo "done"
fi
ssh $target "$(typeset -f); await $(( $len * $cluster_len - $cluster_len*10)) $target_outfile" >$local_tmpfile &
waitpid=$!
# 16:17 < osse> entity: that's not how declare -p is meant to be used
cluster_decl=$( declare -p hosts=(${cluster[@]}) )
start_time=$(ssh $source "$cluster_decl; $(typeset -f); stream $remote_datafile $cluster_port")
echo "stream finished, waiting for output (start_time=${start_time})"
wait $waitpid
end_time=$(cat $local_tmpfile)
if [[ $end_time =~ [0-9]+$ ]]; then # See if we got a number back
diff_time=$(( $end_time - $start_time ))
messages=$(( $len*$cluster_len ))
elapsed=$(awk "BEGIN {printf \"%.2f\", $diff_time/1000}")
speed=$(awk "BEGIN {printf \"%.2f\", $messages/$elapsed}")
echo "streamed $messages messages in ${elapsed}s ~ $speed/s"
else
echo "target node says: $end_time"
fi
}
# These functions will be called over ssh.
# Called with a filaname to check as the argument.
check_datafile ()
{
if [ -e $1 ]; then
return 0
else
return 1
fi
}
# Called with a filename to stream from, the port cluster listens on
# and a filename containing array of cluster hosts that has been copied over.
stream ()
{
echo `date +%s%3N`
ncpids=()
for host in ${hosts[@]}; do
cat $1 | nc $host $2 &
ncpids+=($!)
done
for pid in ${ncpids}; do wait $pid; done
return 0
}
# Called with a port and out file arguments.
await_init ()
{
command -v ncat >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
echo "Please install 'ncat' (*not* just nc) on the 'source' host (part of the nmap package, perhaps)"
else
nohup ncat -l -k $1 >$2 & >/dev/null
fi
}
# Called with out file argument.
await_stop ()
{
kill $(pidof ncat)
rm $1
}
# Called with the amount of messages to wait for and the output file as the argument.
await ()
{
echo -n "" >$2 # Reset the message output file
max_time=$(( $(date +%s%3N) + $1/30 ))
while true; do
read len _ <<< $(wc -l $2)
if [ $len -ge $1 ]; then
echo -n $(date +%s%3N)
break
elif [ $(date +%s%3N) -gt $max_time ]; then
echo "time limit reached, got only $len messages"
break
else
sleep 0.01
fi
done
}
case $1 in
"init")
echo -n $(ssh $target "$(typeset -f); await_init $target_port $target_outfile")
;;
"stop")
ssh $target "$(typeset -f); await_stop $target_outfile"
;;
"go")
if [ "$2" != "" ]; then
go $2
else
echo "missing argument: file with messages to stream"
fi
;;
*)
echo "Usage:"
echo "$0 init | stop | go <filename>"
;;
esac