Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,22 @@ dstream.foreachRDD { rdd =>
}
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
final Connection connection = createNewConnection(); // executed at the driver
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String record) {
connection.send(record); // executed at the worker
}
});
}
});
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendRecord(rdd):
Expand Down Expand Up @@ -1279,6 +1295,23 @@ dstream.foreachRDD { rdd =>
}
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String record) {
Connection connection = createNewConnection();
connection.send(record);
connection.close();
}
});
}
});
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendRecord(record):
Expand Down Expand Up @@ -1309,6 +1342,25 @@ dstream.foreachRDD { rdd =>
}
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> partitionOfRecords) {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
}
});
}
});
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendPartition(iter):
Expand Down Expand Up @@ -1342,6 +1394,26 @@ dstream.foreachRDD { rdd =>
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> partitionOfRecords) {
// ConnectionPool is a static, lazily initialized pool of connections
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // return to the pool for future reuse
}
});
}
});
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendPartition(iter):
Expand Down