Skip to content

Commit

Permalink
Merge pull request #1607 from pomadchin/feature/parallelize-reads
Browse files Browse the repository at this point in the history
Parallelize reads
  • Loading branch information
echeipesh authored Aug 17, 2016
2 parents 51c469d + 4b7accc commit 4772fb1
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 129 deletions.
5 changes: 4 additions & 1 deletion accumulo/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
geotrellis.accumulo.catalog = "metadata"
geotrellis.accumulo {
catalog = "metadata"
threads.rdd.write = 32
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
package geotrellis.spark.io.accumulo

import java.util.UUID

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.spark.util._
import geotrellis.spark.io.hadoop._

import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import org.apache.accumulo.core.data.{Key, Mutation, Value}
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat
import org.apache.accumulo.core.client.BatchWriterConfig

import scala.collection.JavaConversions._

import scalaz.concurrent.Task
import com.typesafe.config.ConfigFactory
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream._

import java.util.UUID
import java.util.concurrent.Executors

object AccumuloWriteStrategy {
val threads = ConfigFactory.load().getInt("geotrellis.accumulo.threads.rdd.write")

def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest")
}

Expand Down Expand Up @@ -99,12 +92,16 @@ object HdfsWriteStrategy {
* @param config Configuration for the BatchWriters
*/
case class SocketWriteStrategy(
config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(32)
config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(AccumuloWriteStrategy.threads),
threads: Int = AccumuloWriteStrategy.threads
) extends AccumuloWriteStrategy {
def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit = {
val serializeWrapper = KryoWrapper(config) // BatchWriterConfig is not java serializable
val kwThreads = KryoWrapper(threads)
kvPairs.foreachPartition { partition =>
val (config) = serializeWrapper.value
val poolSize = kwThreads.value
val pool = Executors.newFixedThreadPool(poolSize)
val config = serializeWrapper.value
val writer = instance.connector.createBatchWriter(table, config)

val mutations: Process[Task, Mutation] =
Expand All @@ -114,15 +111,15 @@ case class SocketWriteStrategy(
val mutation = new Mutation(key.getRow)
mutation.put(key.getColumnFamily, key.getColumnQualifier, System.currentTimeMillis(), value)
Some(mutation, iter)
} else {
} else {
None
}
}

val writeChannel = channel.lift { (mutation: Mutation) => Task { writer.addMutation(mutation) } }
val writes = mutations.tee(writeChannel)(tee.zipApply).map(Process.eval)
nondeterminism.njoin(maxOpen = 32, maxQueued = 32)(writes).run.unsafePerformSync
writer.close()
nondeterminism.njoin(maxOpen = poolSize, maxQueued = poolSize)(writes)(Strategy.Executor(pool)).run.unsafePerformSync
writer.close(); pool.shutdown()
}
}
}
6 changes: 6 additions & 0 deletions cassandra/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ geotrellis.cassandra {
localDc = "datacenter1"
usedHostsPerRemoteDc = 0
allowRemoteDCsForLocalConsistencyLevel = false
threads {
rdd {
write = 32
read = 32
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package geotrellis.spark.io.cassandra

import geotrellis.spark._
import geotrellis.spark.io._

import com.datastax.driver.core.{ResultSet, Session}
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{set, eq => eqs}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ import geotrellis.spark.{Boundable, KeyBounds, LayerId}
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.index.{IndexRanges, MergeQueue}

import scalaz.concurrent.{Strategy, Task}
import scalaz.std.vector._
import scalaz.stream.{Process, nondeterminism}
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}
import org.apache.avro.Schema
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import java.util.concurrent.Executors

import com.typesafe.config.ConfigFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand All @@ -25,7 +31,8 @@ object CassandraRDDReader {
decomposeBounds: KeyBounds[K] => Seq[(Long, Long)],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
numPartitions: Option[Int] = None
numPartitions: Option[Int] = None,
threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.rdd.read")
)(implicit sc: SparkContext): RDD[(K, V)] = {
if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)]

Expand All @@ -34,9 +41,9 @@ object CassandraRDDReader {
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable

val ranges = if (queryKeyBounds.length > 1)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)

val bins = IndexRanges.bin(ranges, numPartitions.getOrElse(sc.defaultParallelism))

Expand All @@ -47,36 +54,43 @@ object CassandraRDDReader {
.and(eqs("zoom", layerId.zoom))
.toString

val rdd: RDD[(K, V)] =
sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
instance.withSession { session =>
val statement = session.prepare(query)
sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
instance.withSession { session =>
val statement = session.prepare(query)
val pool = Executors.newFixedThreadPool(threads)

val result = partition map { seq =>
val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter =>
if (iter.hasNext) {
val (start, end) = iter.next()
Some((start to end).toIterator, iter)
} else None
}

val tileSeq: Iterator[Seq[(K, V)]] =
for {
rangeList <- partition // Unpack the one element of this partition, the rangeList.
range <- rangeList
index <- range._1 to range._2
} yield {
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) recs
else recs.filter { row => includeKey(row._1) }
} else {
Seq.empty
}
val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator =>
Process.unfold(iterator) { iter =>
if (iter.hasNext) {
val index = iter.next()
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) Some(recs, iter)
else Some(recs.filter { row => includeKey(row._1) }, iter)
} else Some(Vector.empty, iter)
} else None
}
}

/** Close partition session */
(tileSeq ++ Iterator({
session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)]
})).flatten
nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync
}
}

rdd
/** Close partition session */
(result ++ Iterator({
pool.shutdown(); session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)]
})).flatten
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package geotrellis.spark.io.cassandra
import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs._
import geotrellis.spark.LayerId

import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.schemabuilder.SchemaBuilder
import com.datastax.driver.core.DataType._
import com.datastax.driver.core.ResultSet
import org.apache.spark.rdd.RDD

import scalaz.concurrent.Task
import com.typesafe.config.ConfigFactory
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.{Process, nondeterminism}

import java.nio.ByteBuffer
import java.util.concurrent.Executors

Expand All @@ -24,7 +26,8 @@ object CassandraRDDWriter {
layerId: LayerId,
decomposeKey: K => Long,
keyspace: String,
table: String
table: String,
threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.rdd.write")
): Unit = {
implicit val sc = raster.sparkContext

Expand Down Expand Up @@ -71,8 +74,7 @@ object CassandraRDDWriter {
}
}

/** magic number 32; for no reason; just because */
val pool = Executors.newFixedThreadPool(32)
val pool = Executors.newFixedThreadPool(threads)

val write: ((java.lang.Long, ByteBuffer)) => Process[Task, ResultSet] = {
case (id, value) =>
Expand All @@ -81,13 +83,13 @@ object CassandraRDDWriter {
}(pool)
}

val results = nondeterminism.njoin(maxOpen = 32, maxQueued = 32) {
val results = nondeterminism.njoin(maxOpen = threads, maxQueued = threads) {
queries map write
} onComplete {
}(Strategy.Executor(pool)) onComplete {
Process eval Task {
session.closeAsync()
session.getCluster.closeAsync()
}
}(pool)
}

results.run.unsafePerformSync
Expand Down
32 changes: 32 additions & 0 deletions docs/spark/spark-io.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,35 @@ val tile: Tile = nlcdReader.read(SpatialKey(1,2))
```

The idea is similar to the `LayerReader.reader` method except in this case we're producing a reader for single tiles. Additionally it must be noted that the layer metadata is accessed during the construction of the `Reader[SpatialKey, Tile]` and saved for all future calls to read a tile.

## Readers threads

Cassandra and S3 Layer RDDReaders / RDDWriters are configurable by threads amount. It's a programm setting, that can be different for a certain machine (depends on resources available). Configuration could be set in the `reference.conf` / `application.conf` file of your app, default settings available in a `reference.conf` file of each backend subproject (we use [TypeSafe Config](https://github.com/typesafehub/config)).
For a File backend only RDDReader is configurable, For Accumulo - only RDDWriter (Socket Strategy). For all backends CollectionReaders are configurable as well.

Default configuration example:

```conf
geotrellis.accumulo.threads {
rdd.write = 32
collection.read = 32
}
geotrellis.file.threads {
rdd.read = 32
collection.read = 32
}
geotrellis.cassandra.threads {
collection.read = 32
rdd {
write = 32
read = 32
}
}
geotrellis.s3.threads {
collection.read = 8
rdd {
write = 8
read = 8
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class HBaseSpaceTimeSpec
instance.getAdmin.disableTable("tiles")
instance.getAdmin.deleteTable("metadata")
instance.getAdmin.deleteTable("tiles")
instance.getAdmin.close()
}

lazy val instance = HBaseInstance(Seq("localhost"), "localhost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class HBaseSpatialSpec
instance.getAdmin.disableTable("tiles")
instance.getAdmin.deleteTable("metadata")
instance.getAdmin.deleteTable("tiles")
instance.getAdmin.close()
}

lazy val instance = HBaseInstance(Seq("localhost"), "localhost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class HBaseTileFeatureSpaceTimeSpec
instance.getAdmin.disableTable("tiles")
instance.getAdmin.deleteTable("metadata")
instance.getAdmin.deleteTable("tiles")
instance.getAdmin.close()
}

lazy val instance = HBaseInstance(Seq("localhost"), "localhost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class HBaseTileFeatureSpatialSpec
instance.getAdmin.disableTable("tiles")
instance.getAdmin.deleteTable("metadata")
instance.getAdmin.deleteTable("tiles")
instance.getAdmin.close()
}

lazy val instance = HBaseInstance(Seq("localhost"), "localhost")
Expand Down
2 changes: 1 addition & 1 deletion project/Version.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object Version {
val monocle = "1.2.1"
val accumulo = "1.7.1"
val cassandra = "3.0.3"
val hbase = "1.2.1"
val hbase = "1.2.2"
lazy val hadoop = Environment.hadoopVersion
lazy val spark = Environment.sparkVersion
}
6 changes: 6 additions & 0 deletions s3/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
geotrellis.s3.threads {
rdd {
write = 8
read = 8
}
}
Loading

0 comments on commit 4772fb1

Please sign in to comment.