-
Notifications
You must be signed in to change notification settings - Fork 29
Shared variables
Normally every worker have own copy of data but these variables are copied to each machine.
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
Distributing variables is part of Scala Spark.
broadcast1 = sc.broadcast('a')
broadcast2 = sc.broadcast('b')
broadcast3 = sc.broadcast([1,2,3])
func = Proc.new do |part, index|
[
broadcast1.value * index,
broadcast2.value * index,
broadcast3.value.reduce(:+)
]
end
rdd = sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2, broadcast3: broadcast3)
rdd = rdd.map_partitions_with_index(func)
rdd.collect
A shared variable that can be accumulated, i.e., has a commutative and associative "add"
operation. Worker tasks on a Spark cluster can add values to an Accumulator with the +=
operator, but only the driver program is allowed to access its value, using value.
Updates from the workers get propagated automatically to the driver program.
There can be a little delay of synchronization (stage is done but accumulator is not yet updated) because of this problem http://stackoverflow.com/questions/28560133/ruby-server-java-scala-client-deadlock.
accum1 = $sc.accumulator(1)
accum2 = $sc.accumulator(2, :*, 1)
accum3 = $sc.accumulator(3, lambda{|max, val| val > max ? val : max})
accum1 += 1
accum2.add(2)
accum2.add(2)
accum2.add(2)
accum3.add(9)
accum3.add(6)
accum3.add(7)
accum1.value # => 2
accum2.value # => 16
accum3.value # => 9
func = Proc.new do |_, index|
accum1.add(1)
accum2.add(2)
accum3.add(index * 10)
end
rdd = sc.parallelize(0..4, 4)
rdd = rdd.bind(accum1: accum1, accum2: accum2, accum3: accum3)
rdd = rdd.map_partitions_with_index(func)
rdd.collect
accum1.value # => 6
accum2.value # => 256
accum3.value # => 30