Skip to content

Commit

Permalink
Merge pull request #1606 from pomadchin/feature/collections-api
Browse files Browse the repository at this point in the history
Collections API
  • Loading branch information
echeipesh authored Sep 1, 2016
2 parents 2077839 + 3c0134c commit 2e87199
Show file tree
Hide file tree
Showing 68 changed files with 1,141 additions and 271 deletions.
7 changes: 5 additions & 2 deletions accumulo/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
geotrellis.accumulo {
catalog = "metadata"
threads.rdd.write = 32
}
threads {
collection.read = default
rdd.write = default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import org.apache.accumulo.core.security.Authorizations
import org.apache.accumulo.core.data._
import org.apache.hadoop.io.Text


import scala.collection.JavaConversions._

object AccumuloAttributeStore {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package geotrellis.spark.io.accumulo

import geotrellis.spark.io._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.{Boundable, KeyBounds}

import scalaz.std.vector._
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.{Process, channel, nondeterminism, tee}
import org.apache.accumulo.core.data.{Range => AccumuloRange}
import org.apache.accumulo.core.security.Authorizations
import org.apache.avro.Schema
import org.apache.hadoop.io.Text
import com.typesafe.config.ConfigFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import java.util.concurrent.Executors

object AccumuloCollectionReader {
def read[K: Boundable: AvroRecordCodec: ClassTag, V: AvroRecordCodec: ClassTag](
table: String,
columnFamily: Text,
queryKeyBounds: Seq[KeyBounds[K]],
decomposeBounds: KeyBounds[K] => Seq[AccumuloRange],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
threads: Int = ConfigFactory.load().getThreads("geotrellis.accumulo.threads.collection.read")
)(implicit instance: AccumuloInstance): Seq[(K, V)] = {
if(queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

val codec = KeyValueRecordCodec[K, V]
val includeKey = (key: K) => queryKeyBounds.includeKey(key)

val ranges = queryKeyBounds.flatMap(decomposeBounds).toIterator

val pool = Executors.newFixedThreadPool(threads)

val range: Process[Task, AccumuloRange] = Process.unfold(ranges) { iter =>
if (iter.hasNext) Some(iter.next(), iter)
else None
}

val readChannel = channel.lift { (range: AccumuloRange) => Task {
val scanner = instance.connector.createScanner(table, new Authorizations())
scanner.setRange(range)
scanner.fetchColumnFamily(columnFamily)
val result = scanner.iterator.map { case entry =>
AvroEncoder.fromBinary(writerSchema.getOrElse(codec.schema), entry.getValue.get)(codec)
}.flatMap { pairs: Vector[(K, V)] =>
if(filterIndexOnly) pairs
else pairs.filter { pair => includeKey(pair._1) }
}.toVector
scanner.close()
result
}(pool) }

val read = range.tee(readChannel)(tee.zipApply).map(Process.eval)


try {
nondeterminism
.njoin(maxOpen = threads, maxQueued = threads) { read }(Strategy.Executor(pool))
.runFoldMap(identity).unsafePerformSync: Seq[(K, V)]
} finally pool.shutdown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package geotrellis.spark.io.accumulo

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.util._

import org.apache.accumulo.core.data.{Range => AccumuloRange}
import org.apache.hadoop.io.Text
import spray.json._

import scala.reflect._

class AccumuloLayerCollectionReader(val attributeStore: AttributeStore)(implicit instance: AccumuloInstance) extends CollectionLayerReader[LayerId] {

def read[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]
](id: LayerId, rasterQuery: LayerQuery[K, M], filterIndexOnly: Boolean) = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)

val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[AccumuloLayerHeader, M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerReadError(id).initCause(e)
}

val queryKeyBounds = rasterQuery(metadata)

val decompose = (bounds: KeyBounds[K]) =>
keyIndex.indexRanges(bounds).map { case (min, max) =>
new AccumuloRange(new Text(AccumuloKeyEncoder.long2Bytes(min)), new Text(AccumuloKeyEncoder.long2Bytes(max)))
}

val seq = AccumuloCollectionReader.read[K, V](header.tileTable, columnFamily(id), queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema))
new ContextCollection(seq, metadata)
}
}

object AccumuloLayerCollectionReader {
def apply(attributeStore: AccumuloAttributeStore)(implicit instance: AccumuloInstance): AccumuloLayerCollectionReader =
new AccumuloLayerCollectionReader(attributeStore)

def apply(implicit instance: AccumuloInstance): AccumuloLayerCollectionReader =
new AccumuloLayerCollectionReader(AccumuloAttributeStore(instance.connector))
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package geotrellis.spark.io.accumulo

import geotrellis.spark.util._
import geotrellis.spark.io._
import geotrellis.spark.io.hadoop._

import org.apache.hadoop.mapreduce.Job
Expand All @@ -17,7 +18,7 @@ import java.util.UUID
import java.util.concurrent.Executors

object AccumuloWriteStrategy {
val threads = ConfigFactory.load().getInt("geotrellis.accumulo.threads.rdd.write")
val threads = ConfigFactory.load().getThreads("geotrellis.accumulo.threads.rdd.write")

def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest")
}
Expand Down Expand Up @@ -116,7 +117,7 @@ case class SocketWriteStrategy(
}
}

val writeChannel = channel.lift { (mutation: Mutation) => Task { writer.addMutation(mutation) } }
val writeChannel = channel.lift { (mutation: Mutation) => Task { writer.addMutation(mutation) } (pool) }
val writes = mutations.tee(writeChannel)(tee.zipApply).map(Process.eval)
nondeterminism.njoin(maxOpen = poolSize, maxQueued = poolSize)(writes)(Strategy.Executor(pool)).run.unsafePerformSync
writer.close(); pool.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ class AccumuloSpaceTimeSpec
implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = CoordinateSpaceTime
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ class AccumuloSpatialSpec

implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reader = AccumuloLayerReader(instance)
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile

lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ class AccumuloTileFeatureSpaceTimeSpec
implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = CoordinateSpaceTime
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class AccumuloTileFeatureSpatialSpec

implicit lazy val instance = MockAccumuloInstance()

lazy val reader = AccumuloLayerReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reader = AccumuloLayerReader(instance)
lazy val creader = AccumuloLayerCollectionReader(instance)
lazy val writer = AccumuloLayerWriter(instance, "tiles", SocketWriteStrategy())
lazy val deleter = AccumuloLayerDeleter(instance)
lazy val reindexer = AccumuloLayerReindexer(instance, SocketWriteStrategy())
lazy val updater = AccumuloLayerUpdater(instance, SocketWriteStrategy())
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile

lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
lazy val tiles = AccumuloValueReader(instance)
lazy val sample = AllOnesTestFile
lazy val copier = AccumuloLayerCopier(instance, reader, writer)
lazy val mover = AccumuloLayerMover(copier, deleter)
}
5 changes: 3 additions & 2 deletions cassandra/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ geotrellis.cassandra {
usedHostsPerRemoteDc = 0
allowRemoteDCsForLocalConsistencyLevel = false
threads {
collection.read = default
rdd {
write = 32
read = 32
write = default
read = default
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package geotrellis.spark.io.cassandra

import geotrellis.spark.{Boundable, KeyBounds, LayerId}
import geotrellis.spark.io._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.index.MergeQueue
import geotrellis.spark.util.KryoWrapper

import org.apache.avro.Schema
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}
import com.typesafe.config.ConfigFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

object CassandraCollectionReader {
def read[K: Boundable : AvroRecordCodec : ClassTag, V: AvroRecordCodec : ClassTag](
instance: CassandraInstance,
keyspace: String,
table: String,
layerId: LayerId,
queryKeyBounds: Seq[KeyBounds[K]],
decomposeBounds: KeyBounds[K] => Seq[(Long, Long)],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
threads: Int = ConfigFactory.load().getThreads("geotrellis.cassandra.threads.collection.read")
): Seq[(K, V)] = {
if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

val includeKey = (key: K) => queryKeyBounds.includeKey(key)
val _recordCodec = KeyValueRecordCodec[K, V]
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable

val ranges = if (queryKeyBounds.length > 1)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)

val query = QueryBuilder.select("value")
.from(keyspace, table)
.where(eqs("key", QueryBuilder.bindMarker()))
.and(eqs("name", layerId.name))
.and(eqs("zoom", layerId.zoom))
.toString

instance.withSessionDo { session =>
val statement = session.prepare(query)

LayerReader.njoin[K, V](ranges.toIterator, threads){ index: Long =>
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) recs
else recs.filter { row => includeKey(row._1) }
} else Vector.empty
}
}: Seq[(K, V)]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package geotrellis.spark.io.cassandra

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.util._

import spray.json._

import scala.reflect._

class CassandraLayerCollectionReader(val attributeStore: AttributeStore, instance: CassandraInstance) extends CollectionLayerReader[LayerId] {

def read[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]
](id: LayerId, rasterQuery: LayerQuery[K, M], filterIndexOnly: Boolean) = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)

val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[CassandraLayerHeader, M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerReadError(id).initCause(e)
}

val queryKeyBounds = rasterQuery(metadata)

val decompose = (bounds: KeyBounds[K]) => keyIndex.indexRanges(bounds)

val seq = CassandraCollectionReader.read[K, V](instance, header.keyspace, header.tileTable, id, queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema))
new ContextCollection(seq, metadata)
}
}

object CassandraLayerCollectionReader {
def apply(instance: CassandraInstance): CassandraLayerCollectionReader =
new CassandraLayerCollectionReader(CassandraAttributeStore(instance), instance)

def apply(attributeStore: CassandraAttributeStore): CassandraLayerCollectionReader =
new CassandraLayerCollectionReader(attributeStore, attributeStore.instance)
}
Loading

0 comments on commit 2e87199

Please sign in to comment.