Scromium is a scala library for dealing with the utter shit show that is the cassandra thrift API. Assuming you use SBT, pull scromium into your project thusly:
val scromiumRepo = "Cliff's Scromium Repo" at "http://cliffmoon.github.com/scromium/repository/"
val scromium = "scromium" %% "scromium" % "0.6.4" artifacts
Artifact("scromium-all_2.8.0", "all", "jar")
Why the extra artifact? We'll get to that.
- Chainable API for building up queries and inserts.
- Exposes most (not all) of Cassandra's thrift API in a typesafe manner.
- Implicit serialization and deserialization API's make it easy to define conversions for custom classes.
- Integrated test harness that will reliably start up and shut down an embedded cassandra instance in your tests.
- Pluggable connection pool with reliable connection handling, including retries for downed servers.
- JSON based configuration.
Starting with version 0.6 the minor version number will be pinned to Cassandra's. Thus making it easy to know which versions of Cassandra will be compatible with Scromium.
Scromium has two different artifacts available in the main repository. The default artifact is simply a jar with the main scromium code. Its transitive dependencies are declared in the .pom, however there are a few dependencies that it relies upon which are not managed by a maven repository. These unmanaged dependencies have mostly to do with the thrift interface and running the embedded Cassandra node.
Pulling in the thin Jar:
val scromiumRepo = "Cliff's Scromium Repo" at "http://cliffmoon.github.com/scromium/repository/"
val scromium = "scromium" % "scromium_2.8.0.Beta1" % "0.6.1"
There is also a fat jar available as scromium-all. It includes a patched version of cassandra along with its unmanaged dependencies.
Pulling in the fat Jar:
val scromiumRepo = "Cliff's Scromium Repo" at "http://cliffmoon.github.com/scromium/repository/"
val scromium = "scromium" % "scromium_2.8.0.Beta1" % "0.6.1" artifacts Artifact("scromium-all_2.8.0.Beta1", "all", "jar")
The way shit is supposed to work. We need to first start Scromium, which involves reading in the configuration and spinning up a connection pool. Start will take either no arguments, a file, or a Map[String,Any] depending on how you want to deliver Scromium's configuration.
import scromium._
import scromium.api._
import scromium.serializers.Serializers._
val conf = Map("connectionPool" -> "scromium.connection.CommonsConnectionPool",
"seedHost" -> "127.0.0.1",
"seedPort" -> 9160,
"maxIdle" -> 10,
"initCapacity" -> 10,
"clientProvider" -> "scromium.thrift.ThriftClientProvider")
val cassandra = Cassandra.start(conf)
Most of the API is chainable, except where it either is not possible or does not make sense. The idea is to be able to build up queries and inserts naturally. This piece of code opens up the keyspace "fuck" for some operations, and passes that keyspace into the resulting block.
cassandra.keyspace("fuck") { ks =>
Consistency levels can be delivered via implicit parameters. This saves a lot of typing if you know a block of code will all be using the same read or write consistency level.
implicit val consistency = ReadConsistency.One
The get API is oriented around around either retrieving a single cell or a single supercolumn. You pass parent objects to sub codeblocks to do stuff.
Pretty self explanatory
cassandra.keyspace("Keyspace") { ks =>
//Get Methods
//for Column
ks.columnFamily("ColumnFamily") { cf =>
val column = cf.getColumn("row1", "c1").get
}
//for SuperColumn
ks.superColumnFamily("SuperColumnFamily") { cf =>
val sc = cf.getSuperColumn("row1", "sc1").get
val c = cf.getSubColumn("row1", "sc1", "c1").get
}
}
Range queries are performed via the range and rangeSuper methods of Keyspace. Range queries require using the OrderPreservingPartitioner, so they will fail with an incorrectly configured Cassandra node.
cassandra.keyspace("Keyspace") { ks =>
ks.superColumnFamily("SuperColumnFamily") { scf =>
val slice = Slice("fuck", "shit")
val rows = List("a".getBytes, "b".getBytes, "c".getBytes)
val selector = new Selector(rows).slice(slice)
val iterColumns = scf.get(selector)
}
}
Inserts and batch inserts are pretty simple as well,
cassandra.keyspace("Keyspace") { ks =>
ks.columnFamily("ColumnFamily") { cf =>
cf.batch { put =>
val r = put.row("row1")
r.insert("c1", "value")
//.. add more columns/rows to batch it if you want
cf.put(put)
}
}
}
Multigets, again really really simple
cassandra.keyspace("Keyspace") { ks =>
ks.columnFamily("ColumnFamily") { cf =>
val results = cf.get(cf.selector(List("row1", "row2")).columns(List("c1", "c2", "c3"))).toList
println("results " + results)
}
}
Throughout all of these examples not much attention has been paid to the types of the columns, supercolumns, and values. Cassandra deals with these internally as opaque byte arrays. Scromium provides a serialization API with a few presets defined. This serialization API should make it easy for you to write conversions from your own objects into and back out from byte arrays.
Serializers are provided to the API as implicit parameters. Scala will automatically choose the most specific Serializer or Deserializer available at the call site.
You can define your own Serializers.
implicit object FuckSerializer extends Serializer[Fuck] {
def serialize(fuck : Fuck) = Array[Byte](0)
def deserialize(ary : Array[Byte]) = new Fuck
}
And that will allow you to give a Fuck and get a Fuck as long as that implicit object is in scope.
cassandra.keyspace("Keyspace") { ks => {
ks.columnFamily("ColumnFamily") { cf =>
cf.batch { put =>
val r = put.row("row")
r.insert("c1", new Fuck)
cf.put(put)
}
val myFuck = cf.getColumn("row", "c1").get.valueAs[Fuck]
// Simple as Fuck!
}
}
The API of scromium actively resists testing via mocking. Also I believe mocking in this case is dumb. When you mock something like a database what are you testing against? Certainly not the behavior of the actual database. You are testing against your assumptions about how the database ought to work. We are in the JVM. Why reimplement cassandra using a mocking API? We should simply lean into it and embed a Cassandra node in our tests.
import org.specs._
import scromium._
import scromium.serializers.Serializers._
class FuckSpecification extends Specification {
var cassandra : Cassandra = null
doBefore { cassandra = Cassandra.startTest }
doAfter { cassandra.teardownTest }
"Fuck" should {
"do some shit" in {
cassandra.keyspace("fuck_shit") { ks =>
implicit val w = WriteConsistency.One
implicit val r = ReadConsistency.Any
ks.insert("its", "a" -> "stack" -> "of", "fuckshit")
val result = ks.get("its", "a") % "stack" / "of" !;
result.valueAs[String] must be("fuckshit")
}
}
}
}
That test should green bar.
The one caveat with running Cassandra in this way is that it starts up a shit ton of threads and once we pull the rug out from under them in the teardown method they will eventually start vomiting to the console. They are harmless, mind you, but something to be aware of. Ghosts of instantiation past.
Scromium is configured via a Map[String,Any]
. This configuration can be delivered through a variety
of means. It can either be passed directly into Cassandra.start(map : Map[String,Any])
or it can
be supplied as a JSON file passed in to Cassandra.start(file : String)
.
The configuration simply consists of the class name of a ConnectionPool implementation and options which should be passed in.
The default is the CommonsConnectionPool which uses the Apache Commons stack object pool as its implementation. There is also an ActorConnectionPool which wraps each connection in an individual actor for automatic asynchronous execution.