Skip to content

Commit 2f3178c

Browse files
committed
Added more information about writing reliable receivers in the custom receiver guide.
1 parent 91aa5aa commit 2f3178c

File tree

2 files changed

+75
-14
lines changed

2 files changed

+75
-14
lines changed

docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ title: Spark Configuration
88
Spark provides three locations to configure the system:
99

1010
* [Spark properties](#spark-properties) control most application parameters and can be set by using
11-
a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object, or through Java
11+
a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object, or through Java
1212
system properties.
1313
* [Environment variables](#environment-variables) can be used to set per-machine settings, such as
1414
the IP address, through the `conf/spark-env.sh` script on each node.

docs/streaming-custom-receivers.md

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,29 @@ Spark Streaming can receive streaming data from any arbitrary data source beyond
77
the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
88
This requires the developer to implement a *receiver* that is customized for receiving data from
99
the concerned data source. This guide walks through the process of implementing a custom receiver
10-
and using it in a Spark Streaming application.
10+
and using it in a Spark Streaming application. Note that custom receivers can be implemented
11+
in Scala or Java.
1112

12-
### Implementing a Custom Receiver
13+
## Implementing a Custom Receiver
1314

14-
This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
15+
This starts with implementing a **Receiver**
16+
([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver),
17+
[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)).
1518
A custom receiver must extend this abstract class by implementing two methods
1619
- `onStart()`: Things to do to start receiving data.
1720
- `onStop()`: Things to do to stop receiving data.
1821

19-
Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads
22+
Both `onStart()` and `onStop()` must not block indefinitely. Typically, `onStart()` would start the threads
2023
that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads
2124
are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
2225
should stop receiving data.
2326

2427
Once the data is received, that data can be stored inside Spark
25-
by calling `store(data)`, which is a method provided by the
26-
[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class.
28+
by calling `store(data)`, which is a method provided by the Receiver class.
2729
There are number of flavours of `store()` which allow you store the received data
28-
record-at-a-time or as whole collection of objects / serialized bytes.
30+
record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavour of
31+
`store()` used to implemented a receiver affects its reliability and fault-tolerance semantics.
32+
This is discussed [later](#receiver-reliability) in more detail.
2933

3034
Any exception in the receiving threads should be caught and handled properly to avoid silent
3135
failures of the receiver. `restart(<exception>)` will restart the receiver by
@@ -158,7 +162,7 @@ public class JavaCustomReceiver extends Receiver<String> {
158162
</div>
159163

160164

161-
### Using the custom receiver in a Spark Streaming application
165+
## Using the custom receiver in a Spark Streaming application
162166

163167
The custom receiver can be used in a Spark Streaming application by using
164168
`streamingContext.receiverStream(<instance of custom receiver>)`. This will create
@@ -191,9 +195,68 @@ The full source code is in the example [JavaCustomReceiver.java](https://github.
191195
</div>
192196
</div>
193197

194-
195-
196-
### Implementing and Using a Custom Actor-based Receiver
198+
## Receiver Reliability
199+
As discussed in brief in the
200+
[Spark Streaming Programming Guide](streaming-programming-guide.html#receiver-reliability),
201+
there are two kinds of receivers based on their reliability and fault-tolerance semantics.
202+
203+
1. *Reliable Receiver* - For *reliable sources* that allow sent data to be acknowledged, a
204+
*reliable receiver* correctly acknowledges to the source that the data has been received
205+
and stored in Spark reliably (that is, replicated successfully). Usually,
206+
implementing this receiver involves careful consideration of the semantics of source
207+
acknowledgements.
208+
1. *Unreliable Receiver* - These are receivers for unreliable sources that do not support
209+
acknowledging. Even for reliable sources, one may implement an unreliable receiver that
210+
do not go into the complexity of acknowledging correctly.
211+
212+
To implement a *reliable receiver*, you have to use `store(multiple-records)` to store data.
213+
This flavour of `store` is a blocking call which returns only after all the given records have
214+
been stored inside Spark. If replication is enabled receiver's configured storage level
215+
(enabled by default), then this call returns after replication has completed.
216+
Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the
217+
source appropriately. This ensures that no data is caused when the receiver fails in the middle
218+
of replicating data -- the buffered data will not be acknowledged and hence will be later resent
219+
by the source.
220+
221+
An *unreliable receiver* does not have to implement any of this logic. It can simply receive
222+
records from the source and insert them one-at-a-time using `store(single-record)`. While it does
223+
not get the reliability guarantees of `store(multiple-records)`, it has the following advantages.
224+
225+
- The system takes care of chunking that data into appropriate sized blocks (look for block
226+
interval in the [Spark Streaming Programming Guide](streaming-programming-guide.html)).
227+
- The system takes care of controlling the receiving rates if the rate limits have been specified.
228+
- Because of these two, *unreliable receivers are simpler to implement than reliable receivers.
229+
230+
The following table summarizes the characteristics of both types of receivers
231+
232+
<table class="table">
233+
<tr>
234+
<th>Receiver Type</th>
235+
<th>Characteristics</th>
236+
</tr>
237+
<tr>
238+
<td><b>Unreliable Receivers</b></td>
239+
<td>
240+
Simple to implement.<br>
241+
System takes care of block generation and rate control.
242+
No fault-tolerance guarantees, can loose data on receiver failure.
243+
</td>
244+
</tr>
245+
<tr>
246+
<td><b>Reliable Receivers</b></td>
247+
<td>
248+
Strong fault-tolerance guarantees, can ensure zero data loss.<br/>
249+
Block generation and rate control to be handled by the receiver implementation.<br/>
250+
Implementation complexity depends on the acknowledgement mechanisms of the source.
251+
</td>
252+
</tr>
253+
<tr>
254+
<td></td>
255+
<td></td>
256+
</tr>
257+
</table>
258+
259+
## Implementing and Using a Custom Actor-based Receiver
197260

198261
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
199262
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
@@ -217,5 +280,3 @@ val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
217280

218281
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
219282
for an end-to-end example.
220-
221-

0 commit comments

Comments
 (0)