-
Notifications
You must be signed in to change notification settings - Fork 0
/
conclusions.tex
263 lines (217 loc) · 16.7 KB
/
conclusions.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
\section{Conclusions} \label{sec:conclusions}
First, we summarize the answers to our research questions outlined in~\cref{subsec:research-questions-and-contributions}.
Then, we explain two areas of future work: finer-grained work-stealing and possibilities for inter executor work-stealing in Spark.
The first research question is if we can gain performance by specializing \textsc{LFTJ} to graphs.
We explored two possibilities: to use \textsc{CSR} to store the graphs edge relationship and to find a more suitable
intersection algorithm.
Using \textsc{CSR} is a big advantage for dense queries where it leads up to 11 times better performance.
Sparse queries profit less but are often 2 times faster.
We partially succeeded in finding a better intersection algorithm to use in the \textit{LeapfrogJoins} for the graph-pattern use-case.
We find that intersections tend to converge to their final size quickly when using binary intersections.
Therefore, multiple binary intersections driven by the small intermediary result are often faster than the n-ary intersection proposed in
the original Leapfrog Triejoin paper~\cite{lftj}.
Again, dense queries can profit while sparse queries show no difference or become slower.
The slow-down in sparse queries is likely caused by our second attempted specialization, namely materializing the intersections in
hope for better use of the processor cache.
We can not measure any difference in cache use.
Hence, this optimization is not worth-while.
To answer the first research question: yes, \textsc{WCOJ} should be specialized to the graph-pattern
use-case.
Even straightforward optimizations like using a compact graph representation can lead to 11 times
better performance.
Our second research question concerns the scalability of \textsc{WCOJ}'s in Spark.
We find that the implementation in Myria of a provably optimal partitioning scheme, Shares, suitable for
Spark does not scale well.
In \cref{subsubsec:shares}, we demonstrate that Shares scaling behaviour is inherently bad for graph-pattern matching because
it leads to a lot of duplicated work and its partitioning deteriorates to a full broadcast;
hence, it cannot save much communication or memory over simply replicating the whole graph on each machine.
In particular, if we run multiple queries.
We conclude that caching the complete edge relationship of the graph on each machine at system start-up is the better design.
This fits well with \textsc{WCOJ} joins because they need to sort and materialize their input relationships which comes at non-trivial
costs.
When we cache the edge relationship once and reuse it for all queries, we can amortize these costs.
On the base of a replicated graph, we study two \textit{logical}, static partitioning schemes which we directly integrate into the
Leapfrog Triejoin.
First, we implement range based partitioning on a single variable which allows us to trade duplicated work against skew.
Second, we build a range based and a hashed based Shares partitioning into the \textsc{LFTJ}.
We find that hash-based Shares is the best \textit{logical}, static partitioning scheme because it is most skew resilient.
Although, it replicates more work than partitioning on the first or second variable.
However, the replicated work leads to bad scaling of \textit{logical} Shares:
measured over 3 queries on three different datasets, Shares strongest speedup is 25 for 96 workers.
On most queries, it does not reach a speedup of 20 or more using 48 cores.
Based on this result, we conclude that static partitioning schemes are not scalable when using the \textsc{LFTJ} for graph-pattern
matching.
Therefore, we investigate the integration of work-stealing in Spark.
First, we demonstrate high potential for work-stealing as a partitioning scheme in our experiments on a single machine in Spark's local
mode in~\cref{subsec:scaling-graphWCOJ}.
It achieves super-linear speedup for the triangle query on two out of three datasets and near-linear speedup if we do not take sequential
overhead into account on the last dataset.
Bigger queries scale less well but often twice as good as Shares.
We identified residual skew as the main reason for the weaker scaling on 4-clique and 5-clique.
This is caused by our design decision to use one binding for the first variable as minimal stealable job size.
We describe an alternative algorithm to generalize work-stealing to all levels of variables in~\cref{subsubsec:finer-grained-work-stealing}.
Then, we investigate the scaling of a simple, communication free, distributed work-stealing integration into Spark.
We find that the scaling of it is similar to the local mode version in 2 out of 4 measured queries.
A third query (5-clique on Orkut) scales roughly half as well as in the local version.
The last of the 4 queries (3-clique on LiveJournal), lacks strongly behind the local version and seems not to scale
to higher levels of parallelism.
This is partly caused by the contention of the work-stealing queue which can be easily mitigated
by using one queue per worker which leaves queues uncontested until most tasks are finished;
we provide a more detailed analysis of the expected contention of this design in~\cref{subsubsec:finer-grained-work-stealing}.
However, we recognize that our distributed version works best on a cluster with a few
big machines, like the one we ran our experiments on.
In this setup, the graph is broken down in a few big pieces which are processed by
many tasks that share work.
If one uses a cluster that has many small machines, the graph is partitioned in many small pieces
which are worked upon by a few tasks each.
This setup is likely to lead to higher inter worker skew.
We discuss possible solutions in~\cref{subsubsec:cluster-mode}.
We conclude that executor internal work-stealing is a good starting point to fully distribute
\textsc{WCOJ} optimal joins on Spark.
It can reach similar speedups than the local version and beats Spark's built-in joins
by two magnitudes.
While being far from perfect it is the best scaling distributed \textsc{WCOJ} on a Spark
like system to the best of our knowledge.
The answers to our second set of research questions are: Shares scales better than less skew resilient schemes but
does not scale well in absolute terms.
Therefore, dynamic work-sharing approaches are needed to achieve good speedup in a Spark like system.
We find that work-stealing works well when used on a single machine but distribution to multiple
machines is difficult because it is not possible to communicate between executors in Spark.
We discuss possible solutions in~\cref{subsubsec:cluster-mode}.
The software implemented for this thesis is open source and can be found on GitHub at
\mbox{\url{https://github.com/cwida/edge-frames}}.
\subsection{Distributed work-stealing} \label{subsubsec:cluster-mode}
Work-stealing between executors inherently needs communication.
Spark does not offer any direct means of communication between two executors to the developer.
\textit{Broacast variables} and \textit{Accumables} offer one-way communication from the driver to the executors and vice versa.
One could assume that these can be combined into a two-way message channel.
However, this is not the case.
\textit{Broadcast variables} can transfer information to the executors but they do not offer the ability to check whether a message has
arrived.
The information they carry is accessed via a lazy attribute of a variable from the outer scope of the task closure.
Once, the lazy attribute is evaluated by the task, it takes the current value at the driver and will not change anymore even if
it does so on the driver.
Hence, the executor can only guess when to evaluate the value and if it does so too early the variable cannot be reused.
Additionally, the number of broadcast variables needs to be known before the task is serialized because they are variables in the outer
scope of the closure to serialize.
Therefore, the number of broadcast variables cannot be infinite.
If we take the two arguments together, it becomes clear that it is not possible to use broadcast variables for dynamic communication.
There are two possibilities to enable executors to communicate in Spark.
First, one can use an external message-passing framework which can be set up during Spark's startup and connect all executors, e.g.
Akka\footnote{https://akka.io/}.
That is possible because often Spark executors are connected in the same network and otherwise the driver can be used as a proxy.
This approach is taken by Fractal (see~\cref{subsec:fractal}) for the use-case of inter executor work-stealing.
We inspected the code and find that they use a straightforward Akka setup for a simple work-stealing approach via message-passing.
Their design is directly applicable to our setup.
Second, Spark uses remote procedure calls internally to schedule tasks on executors and to receive heartbeats and accumables.
The same functionality could be used to implement work-stealing.
In particular, the \textit{BarrierTasksContexts} uses remote procedure calls to implement its barrier method.
This remote procedure end-point could be made accessible to the user with minimal effort.
However, the original design of the barrier mode explicitly names further communication methods as a \textit{Non-goal}\footnote{
https://issues.apache.org/jira/browse/SPARK-24582?page=com.atlassian.jira.plugin.system.issuetabpanels\%3Aall-tabpanel\\
https://docs.google.com/document/d/1GvcYR6ZFto3dOnjfLjZMtTezX0W5VYN9w1l4-tQXaZk/edit\#heading=h.yqvxlbrdqkkb}.
We conclude that both options to introduce communication between different executors are suboptimal.
The first adds an additional dependency.
The second uses private and undocumented Spark internals.
We argue that the first approach is cleaner as it does not misuse parts of the Spark system in a way they have not be designed for
and leaves us in full control of the setup.
\subsection{Finer-grained work-stealing} \label{subsubsec:finer-grained-work-stealing}
In our experiment section~\ref{subsec:scaling-graphWCOJ}, we noted that work-stealing that
operates only on the first level of variable bindings can still lead to skew for bigger queries.
Therefore, we describe a work-stealing Leapfrog Triejoin algorithm that allows stealing work
on all levels.
We describe a possible design in two steps.
First, we explain under which circumstances to steal work.
Then, we describe how to steal work.
In the following, we call the process that steals work thief and the other process
victim.
We let each task start with its own bindings for the first variable, e.g. by assigning
a range to each task based on its partition number.
This can be implemented as a range filter in the \textit{LeapfrogJoin} of the first variable.
Once all of these initial bindings are processed, we start stealing work from other
\textit{LeapfrogTriejoins}.
It is beneficial to steal bindings of variables higher in the global order because this
maximizes the amount of work stolen.
A task is encoded as a prefix of variable bindings, e.g. if we steal work in a 5-clique query at
the third level a prefix might be $[4, 1, 5]$.
If the work-stealing request successfully returns a binding, we set the state of all components
of the thief to the values of the prefix.
Then we run the normal Leapfrog Triejoin algorithm to generate all complete bindings for the stolen
prefix.
When all bindings have been produced, we steal work and repeat the process.
If a work-stealing request cannot find any work, the task finishes.
We propose to steal work by accessing the \textit{LeapfrogJoin} instances of the victim to generate
the next binding for a variable at a given depth.
The victim then should not generate results for this binding.
We face four challenges for the question of how work is stolen.
First, the Leapfrog Triejoin encodes its state in the \textit{TrieIterator} components.
This state should not be changed when we steal work, except for the fact that a stolen
prefix is not considered by the victim.
Second, when we use the \textit{LeapfrogJoin} to steal work it is not guaranteed that the underlying
\textit{TrieIterators} are set to the correct level for this \textit{LeapfrogJoin}.
Third, we plan to use shared \textit{LeapfrogJoin} instances to implement work-stealing.
Therefore, this interface needs to become thread-safe.
Fourth, the \textit{LeapfrogJoin} interfaces need to be accessible to the thieves.
As in our current solution, we implement communication via a shared data structure.
This data structure allows access to all \textit{LeapfrogJoin} instances on the same worker.
If a task needs to steal work, it selects one of these \textit{LeapfrogJoin} instances.
We describe the solution to these problems in order.
For the first problem of not changing the state of the \textit{TrieIterators}, we
suggest to add new \textit{seek} and \textit{next} methods to the \textit{LeapfrogJoin} and
\textit{TrieIterator} interface which do not change the state but work exactly as the originals
otherwise.
Additionally, the stateless \textit{next} method of the \textit{LeapfrogJoin} interface stores
the last value it returns.
This value can be used from the stateful method to seek for the upper bound of it, such that
values returned by the stateless version are skipped in the stateful version.
Then thieves can use the stateless versions and the owner of the interfaces uses the stateful
versions.
The second issue of ensuring that the \textit{LeapfrogJoin} uses the \textit{TrieIterators} with
the correct level is trivially solved by introducing one version per level of the stateless \textit{next}
and \textit{seek} methods of the \textit{TrieIterator} interface and store which to use in the
\textit{LeapfrogJoin}.
Introducing a thread-safe \textit{LeapfrogJoin} interface requires one lock per instance which
needs to be acquired before using any method and released after.
It is not necessary to lock the underlying \textit{TrieIterator} interfaces as we argue in the next
paragraphs.
We start our argument with the observation that we only need to consider cases where one thief and one victim
thread interfere with each other.
This is because only one thief can be active at any \textit{LeapfrogJoin} at any time due to the
the necessity to hold the lock for this \textit{LeapfrogJoin}.
We observe that to use a specific \textit{LeapfrogJoin} to steal work from, the
victim needs to work on bindings which are later in the variable ordering.
Otherwise, the variables above are not bound and it is not possible to steal a complete prefix.
This is a precondition to be enforced on the thieves side when it chooses a \textit{LeapfrogJoin}
to steal work from.
Then, we require the victim to hold the lock of the respective \textit{LeapfrogJoin}
instance when its enters the \textit{trieJoinUp} method.
This guarantees that the victim \textit{LeapfrogTriejoin} cannot break the assumption above during the process
of work-stealing.
With these assumptions in place, we are ensured that the victim does not call
any \textit{TrieIterator} methods which interfere with the thief.
There are two cases to consider.
First, the \textit{TrieIterator} is independent of the work-stealing process because it is not
part of the intersection of the \textit{LeapfrogJoin} which is used by the thief.
Second, the \textit{TrieIterator} is used at a deeper level than the one which is used by the
\textit{LeapfrogJoin} to steal work of.
In both cases, the use of the \textit{TrieIterator} does not interfere with the thief.
The changes outlined above allow to share the \textit{LeapfrogJoin} instances of all
tasks of an executor to steal work at any level in the variable ordering.
This should approach should lead to work-stealing jobs which are fine-grained enough
to be nearly skew free for queries of all sizes.
We end the section with a short discussion of lock contention.
The algorithm starts with totally uncontested locks until the first task finishes work in its
range.
Then the number of threads which could contend for locks grows linearly with the progress of the
algorithm within finding all possible bindings because before a thread can start work-stealing it
finishes its range.
When threads start stealing work they can choose their victims such that they minimize lock contention.
Hence, locks are contested mostly between the thief and the victim.
For this case, we note that the thief chooses \textit{LeapfrogJoins} as high in the variable ordering
as possible which are less often used by the victim which spends most of its time with bindings
for lower variables.
Finally, the locking time of the locks is short most of the times because it only needs to find
one further binding.
With materialized \textit{LeapfrogJoins} the locked code section only
reads one value from an array, one or at most two \textit{seek} calls on the first level of a
\textit{TrieIterator}\footnote{These calls need two array reads. Hence, they are actions in constant time.} and stores the value returned.