Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize reads #1607

Merged
merged 8 commits into from
Aug 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion accumulo/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
geotrellis.accumulo.catalog = "metadata"
geotrellis.accumulo {
catalog = "metadata"
threads.rdd.write = 32
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
package geotrellis.spark.io.accumulo

import java.util.UUID

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.spark.util._
import geotrellis.spark.io.hadoop._

import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import org.apache.accumulo.core.data.{Key, Mutation, Value}
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat
import org.apache.accumulo.core.client.BatchWriterConfig

import scala.collection.JavaConversions._

import scalaz.concurrent.Task
import com.typesafe.config.ConfigFactory
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream._

import java.util.UUID
import java.util.concurrent.Executors

object AccumuloWriteStrategy {
val threads = ConfigFactory.load().getInt("geotrellis.accumulo.threads.rdd.write")

def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest")
}

Expand Down Expand Up @@ -99,12 +92,16 @@ object HdfsWriteStrategy {
* @param config Configuration for the BatchWriters
*/
case class SocketWriteStrategy(
config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(32)
config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(AccumuloWriteStrategy.threads),
threads: Int = AccumuloWriteStrategy.threads
) extends AccumuloWriteStrategy {
def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit = {
val serializeWrapper = KryoWrapper(config) // BatchWriterConfig is not java serializable
val kwThreads = KryoWrapper(threads)
kvPairs.foreachPartition { partition =>
val (config) = serializeWrapper.value
val poolSize = kwThreads.value
val pool = Executors.newFixedThreadPool(poolSize)
val config = serializeWrapper.value
val writer = instance.connector.createBatchWriter(table, config)

val mutations: Process[Task, Mutation] =
Expand All @@ -114,15 +111,15 @@ case class SocketWriteStrategy(
val mutation = new Mutation(key.getRow)
mutation.put(key.getColumnFamily, key.getColumnQualifier, System.currentTimeMillis(), value)
Some(mutation, iter)
} else {
} else {
None
}
}

val writeChannel = channel.lift { (mutation: Mutation) => Task { writer.addMutation(mutation) } }
val writes = mutations.tee(writeChannel)(tee.zipApply).map(Process.eval)
nondeterminism.njoin(maxOpen = 32, maxQueued = 32)(writes).run.unsafePerformSync
writer.close()
nondeterminism.njoin(maxOpen = poolSize, maxQueued = poolSize)(writes)(Strategy.Executor(pool)).run.unsafePerformSync
writer.close(); pool.shutdown()
}
}
}
6 changes: 6 additions & 0 deletions cassandra/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ geotrellis.cassandra {
localDc = "datacenter1"
usedHostsPerRemoteDc = 0
allowRemoteDCsForLocalConsistencyLevel = false
threads {
rdd {
write = 32
read = 32
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package geotrellis.spark.io.cassandra

import geotrellis.spark._
import geotrellis.spark.io._

import com.datastax.driver.core.{ResultSet, Session}
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{set, eq => eqs}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ import geotrellis.spark.{Boundable, KeyBounds, LayerId}
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.index.{IndexRanges, MergeQueue}

import scalaz.concurrent.{Strategy, Task}
import scalaz.std.vector._
import scalaz.stream.{Process, nondeterminism}
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}
import org.apache.avro.Schema
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import java.util.concurrent.Executors

import com.typesafe.config.ConfigFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand All @@ -25,7 +31,8 @@ object CassandraRDDReader {
decomposeBounds: KeyBounds[K] => Seq[(Long, Long)],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
numPartitions: Option[Int] = None
numPartitions: Option[Int] = None,
threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.rdd.read")
)(implicit sc: SparkContext): RDD[(K, V)] = {
if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)]

Expand All @@ -34,9 +41,9 @@ object CassandraRDDReader {
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable

val ranges = if (queryKeyBounds.length > 1)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)

val bins = IndexRanges.bin(ranges, numPartitions.getOrElse(sc.defaultParallelism))

Expand All @@ -47,36 +54,43 @@ object CassandraRDDReader {
.and(eqs("zoom", layerId.zoom))
.toString

val rdd: RDD[(K, V)] =
sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
instance.withSession { session =>
val statement = session.prepare(query)
sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
instance.withSession { session =>
val statement = session.prepare(query)
val pool = Executors.newFixedThreadPool(threads)

val result = partition map { seq =>
val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter =>
if (iter.hasNext) {
val (start, end) = iter.next()
Some((start to end).toIterator, iter)
} else None
}

val tileSeq: Iterator[Seq[(K, V)]] =
for {
rangeList <- partition // Unpack the one element of this partition, the rangeList.
range <- rangeList
index <- range._1 to range._2
} yield {
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) recs
else recs.filter { row => includeKey(row._1) }
} else {
Seq.empty
}
val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator =>
Process.unfold(iterator) { iter =>
if (iter.hasNext) {
val index = iter.next()
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) Some(recs, iter)
else Some(recs.filter { row => includeKey(row._1) }, iter)
} else Some(Vector.empty, iter)
} else None
}
}

/** Close partition session */
(tileSeq ++ Iterator({
session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)]
})).flatten
nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync
}
}

rdd
/** Close partition session */
(result ++ Iterator({
pool.shutdown(); session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)]
})).flatten
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package geotrellis.spark.io.cassandra
import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs._
import geotrellis.spark.LayerId

import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.schemabuilder.SchemaBuilder
import com.datastax.driver.core.DataType._
import com.datastax.driver.core.ResultSet
import org.apache.spark.rdd.RDD

import scalaz.concurrent.Task
import com.typesafe.config.ConfigFactory
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.{Process, nondeterminism}

import java.nio.ByteBuffer
import java.util.concurrent.Executors

Expand All @@ -24,7 +26,8 @@ object CassandraRDDWriter {
layerId: LayerId,
decomposeKey: K => Long,
keyspace: String,
table: String
table: String,
threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.rdd.write")
): Unit = {
implicit val sc = raster.sparkContext

Expand Down Expand Up @@ -71,8 +74,7 @@ object CassandraRDDWriter {
}
}

/** magic number 32; for no reason; just because */
val pool = Executors.newFixedThreadPool(32)
val pool = Executors.newFixedThreadPool(threads)

val write: ((java.lang.Long, ByteBuffer)) => Process[Task, ResultSet] = {
case (id, value) =>
Expand All @@ -81,13 +83,13 @@ object CassandraRDDWriter {
}(pool)
}

val results = nondeterminism.njoin(maxOpen = 32, maxQueued = 32) {
val results = nondeterminism.njoin(maxOpen = threads, maxQueued = threads) {
queries map write
} onComplete {
}(Strategy.Executor(pool)) onComplete {
Process eval Task {
session.closeAsync()
session.getCluster.closeAsync()
}
}(pool)
}

results.run.unsafePerformSync
Expand Down
32 changes: 32 additions & 0 deletions docs/spark/spark-io.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,35 @@ val tile: Tile = nlcdReader.read(SpatialKey(1,2))
```

The idea is similar to the `LayerReader.reader` method except in this case we're producing a reader for single tiles. Additionally it must be noted that the layer metadata is accessed during the construction of the `Reader[SpatialKey, Tile]` and saved for all future calls to read a tile.

## Readers threads

Cassandra and S3 Layer RDDReaders / RDDWriters are configurable by threads amount. It's a programm setting, that can be different for a certain machine (depends on resources available). Configuration could be set in the `reference.conf` / `application.conf` file of your app, default settings available in a `reference.conf` file of each backend subproject (we use [TypeSafe Config](https://github.com/typesafehub/config)).
For a File backend only RDDReader is configurable, For Accumulo - only RDDWriter (Socket Strategy). For all backends CollectionReaders are configurable as well.

Default configuration example:

```conf
geotrellis.accumulo.threads {
rdd.write = 32
collection.read = 32
}
geotrellis.file.threads {
rdd.read = 32
collection.read = 32
}
geotrellis.cassandra.threads {
collection.read = 32
rdd {
write = 32
read = 32
}
}
geotrellis.s3.threads {
collection.read = 8
rdd {
write = 8
read = 8
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class HBaseSpaceTimeSpec
instance.getAdmin.disableTable("tiles")
instance.getAdmin.deleteTable("metadata")
instance.getAdmin.deleteTable("tiles")
instance.getAdmin.close()
}

lazy val instance = HBaseInstance(Seq("localhost"), "localhost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class HBaseSpatialSpec
instance.getAdmin.disableTable("tiles")
instance.getAdmin.deleteTable("metadata")
instance.getAdmin.deleteTable("tiles")
instance.getAdmin.close()
}

lazy val instance = HBaseInstance(Seq("localhost"), "localhost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class HBaseTileFeatureSpaceTimeSpec
instance.getAdmin.disableTable("tiles")
instance.getAdmin.deleteTable("metadata")
instance.getAdmin.deleteTable("tiles")
instance.getAdmin.close()
}

lazy val instance = HBaseInstance(Seq("localhost"), "localhost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class HBaseTileFeatureSpatialSpec
instance.getAdmin.disableTable("tiles")
instance.getAdmin.deleteTable("metadata")
instance.getAdmin.deleteTable("tiles")
instance.getAdmin.close()
}

lazy val instance = HBaseInstance(Seq("localhost"), "localhost")
Expand Down
2 changes: 1 addition & 1 deletion project/Version.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object Version {
val monocle = "1.2.1"
val accumulo = "1.7.1"
val cassandra = "3.0.3"
val hbase = "1.2.1"
val hbase = "1.2.2"
lazy val hadoop = Environment.hadoopVersion
lazy val spark = Environment.sparkVersion
}
6 changes: 6 additions & 0 deletions s3/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
geotrellis.s3.threads {
rdd {
write = 8
read = 8
}
}
Loading