NOTE: This project was an experiment conducted in 2013 to build an eventually consistent REST and JSON database using CRDTs, Akka Cluster and LevelDB. Some of its ideas has now been rolled in to Akka itself through the Akka Distributed Data module (which is production-grade).
The goal of this project is to provide a server-managed CRDT database build on top of Akka Cluster. We plan to implement most of the known CRDTs (Conflict-free Replicated Data Types), both CvRDTs and CmRDTs (see below for the difference). It is meant to be used both from within an Akka application and as a stand-alone REST service speaking JSON.
In short; a CRDT is a data type in which operations always commute and/or state changes always converge. CRDTs gives you very scalable eventual consistency "for free". Most CRDTs are a very limited data structures that does not fit all problems, but when they do, they really excel. There are multiple different CRDTs discovered: counters, sets, graphs etc. A data structure that is made up of CRDTs is also a CRDT, which makes it possible to create rich and advanced data structures that have the same nice properties.
A full outline of what CRDTs are all about is out of scope for this documentation. For good introduction to the subject read the excellent paper A comprehensive study of Convergent and Commutative Replicated Data Types by Mark Shapiro et. al.
// from within an Akka actor
import context._
// get the database extension
val db = ConvergentReplicatedDataTypeDatabase(system)
// find the 'users' GCounter
val nrOfUsers: Future[GCounter] = db.findById[GCounter]("users")
// increment the counter by 1 for this actor and then store it
nrOfUsers map { _ + self.path } foreach { _.store(system) }
NOTE: This is work in progress and is currently to be treated as a Proof of Concept. Apart from hardening and bug fixing etc. you can find some of the outstanding issues in the TODO list. If you find this project interesting please join us and help out.
The implementation is provided as an Akka Extension and build on top of Akka Cluster, which means that the cluster is fully elastic - allowing you to add and remove nodes on the fly as-needed. The storage system is pluggable with a default implementation of LevelDB (both native and Java port).
The CRDTs are immutable and can be queried, updated and managed either directly using the Akka Extension from within actors running on each node or from the "outside" through a REST API serving JSON over HTTP (see below for details). The serialisation protocol/format is pure JSON to make it possible to integrate with other CRDT client libraries (for example one in JavaScript - which is on the TODO list). The JSON library used internally is play-json and the REST API is implemented using Unfiltered. Testing is done with ScalaTest, Akka Testkit and Dispatch.
There are two different implementations:
- CvRDTs - Convergent Replicated Data Types (completed as PoC)
- CmRDTs - Commutative Replicated Data Types (to be implemented)
CvRDTs (Convergent Replicated Data Types) are state-based which means that every instance keeps the full history of all changes that have ever happened to the datastructure and is therefore self-contained and fault-tolerant by design. The convergent nature of the CvRDTs means that state updates do not need to obey any ordering requirements, the only requirement is that all state changes eventually reaches all replicas, and the system will converge.
In this implementation, which is is based on Akka Cluster and Akka Remote, the change sets are batched up (using a configurable batching window - default is 10 ms) and replicated out to the rest of the cluster nodes in an eventually consistent fashion. The eventual consistent nature of the system means that there will be a slight delay in the consistency between the nodes. Each batch is sent to all the nodes in the cluster and is then awaiting an ACK from each node. If no ACK is received then it will retry every T millis (configurable) indefinitely until either an ACK is received or the node is removed from the cluster.
CvRDTs have some really nice properties that makes them fairly straightforward to implement in a reliable fashion without having too hard requirements on the reliability of the underlying infrastructure. This makes them nice to work with in less reliable environments that allows clients to go offline, doing updates and then rejoin and merge in its state changes, on an ad-hoc basis. However, the simplicity in which state is managed—through keeping the full history in the CvRDT itself—has one major problem; keeping the growth of the history under control. Garbage collection of the history in a CvRDT, in a consistent fashion, across a cluster of nodes is a problem that is still under active research. This means that (for now) one have to either keep the update rate of the CvRDTs low in order to keep the history growth under control, or turn to CmRDTs (see next section).
Not implemented yet
CmRDT (Commutative Replicated Data Types) are operations-based which means that it is the state changing events, and not the state changes themselves that are stored. These events are persisted in an event/transaction log and a CmRDT is brought up to its current state by simply replaying the event log. This solves the problem with unbound growth of history, as seen in CvRDTs, since the history is kept in a event log, outside the CmRDT. It also opens up for optimisations like sharding.
CmRDTs require a reliable broadcast channel. This can be implemented either through a single highly-available replicated event log, or (perhaps more interesting) letting each replica set (group of cluster nodes) have its own event log as a form of sharding in order to avoid introducing a SPOF (single point of failure—unless a replicated highly-available log is used) and SPOB (single point of bottleneck). This approach also opens up for optimisations like storing a snapshot of the CmRDT and replaying from the snapshot etc.—as in classic Event Sourcing.
Another advantage is that it makes it a lot easier to create dynamic clusters where nodes can leave and join at any point in time. In CvRDTs you have to do full replication of the database including all its history when a new node joins, using buddy-replication or similar, while in the case of CmRDTs you just replay the transaction log from the latest snapshot.
A concrete implementation could based on a persistent transaction log architecture realised through the upcoming akka-persistence library.
You can create the ConvergentReplicatedDataTypeDatabase
Extension like this (from within an actor):
val db = ConvergentReplicatedDataTypeDatabase(context.system)
Find a new CRDT by id:
val nrOfUsers: Future[GCounter] = db.findById[GCounter]("users")
Shut down the database:
db.shutdown()
You can run the REST server in two different ways.
-
Run it in stand-alone mode from the command line. Here you run it by invoking
sbt run
in the project root directory. You configure it through JVM options, which will override the default configuration values. Example:sbt run -Dakka.crdt.rest-server.port=9009
. You shut down the server by invokingControl-C
which cleans up all resources and by default this destroys the LevelDB database (deletes the files), if you don't want this behaviour then start it up with-Dakka.crdt.convergent.leveldb.destroy-on-shutdown=off
. -
Embedded in an Akka application (see the 'Embedded Server' section below for details). To do this just create the extension using
val db = ConvergentReplicatedDataTypeDatabase(system)
and off you go. The REST server will be started automatically if theakka.crdt.rest-server.run
is set toon
.
Each CRDT has a read-only JSON view representation which is used in the REST API for querying data. For details on the REST API and the different JSON views see the section with the different CRDT descriptions below.
The REST client can contact any of the nodes in the cluster to read or write any data. Normally cluster would be deployed with a load balancer in front of it that can balance the load evenly across the cluster.
LevelDB is the default storage engine, you can configure it to look for a native installation and fall back to a Java port, or it will use the Java port directly. See the configuration file options for details on this and other config options.
Each node has its own LevelDB database. The database files are by default stored in the leveldb
directory in the root directory of the server, but this is configurable. The name of the database files needs to be unique on the machine you are running it on and is now prefixed with the hostname and port of the Akka Cluster node. Please note that if you don't specify the cluster port in the configuration file then a random one is chosen, which means that you will not be able to read in the same database between system restarts. Therefore, if this is important to you, you have to specify the cluster port explicitly in the configuration file or as JVM option when you boot up the system.
You can configure if the LevelDB database should be destroyed on node shutdown through the akka.crdt.convergent.leveldb.destroy-on-shutdown
option.
State-based
CvRDT descriptions below are taken from the MeanGirls README.
A nice property of CRDTs is that a data structure made up by CRDTs is also a CRDT. Which lets you create rich data structures from simple ones.
A g-counter
is a grow-only counter (inspired by vector clocks) in
which only increment and merge are possible. Incrementing the counter
adds 1 to the count for the current actor. Divergent histories are
resolved by taking the maximum count for each actor (like a vector
clock merge). The value of the counter is the sum of all actor
counts.
Create a g-counter
:
val nrOfUsers: GCounter = db.create[GCounter]("users")
Or:
val nrOfUsers: GCounter = GCounter("users")
nrOfUsers.store(context.system)
Find a g-counter
by id:
val nrOfUsers: Future[GCounter] = db.findById[GCounter]("users")
Find or Create a g-counter
:
val nrOfUsers: Future[GCounter] = db.findOrCreate[GCounter]("users")
Increment the counter by 1:
val nodeId = "some-unique-node-id"
val updatedNrOfUsers: GCounter = nrOfUsers + nodeId
Increment the counter with a delta:
val updatedNrOfUsers: GCounter = nrOfUsers + (nodeId, 5)
Get the value of the counter:
val count: Int = nrOfUsers.value
Merge two counters:
val mergedCounter: GCounter = thisCounter merge thatCounter
Store updates to a g-counter
:
// from within an actor
counter.store(system)
Or:
db.update(counter)
Get the view (the current value) of the counter:
val nrOfUsersView: GCounterView = nrOfUsers.view
Get JSON of the view:
val json: JsValue = nrOfUsersView.toJson
val jsonAsString: String = nrOfUsersView.toString
Get JSON of the counter (internal serialized representation):
val json: JsValue = nrOfUsers.toJson
val jsonAsString: String = nrOfUsers.toString
Create g-counter by id.
curl -i -H "Accept: application/json" -X PUT http://127.0.0.1:9009/g-counter/users
Create a g-counter with a random id.
curl -i -H "Accept: application/json" -X PUT http://127.0.0.1:9009/g-counter
Find g-counter by id.
curl -i -H "Accept: application/json" http://127.0.0.1:9009/g-counter/users
Increment the g-counter with 'delta'
curl -i -H "Accept: application/json" \
-X POST -d "node=darkstar" -d "delta=1" \
http://127.0.0.1:9009/g-counter/users
The JSON returned from the REST calls.
{
"type": "g-counter",
"id": "users",
"value": 1
}
This is the internal representation of a g-counter
:
{
"type": "g-counter",
"id": "users",
"state": {
"node1": 2,
"node2": 3
}
}
A pn-counter
allows the counter to be decreased by tracking the
increments (P) separate from the decrements (N), both represented as
internal G-Counters. Merge is handled by merging the internal P and N
counters. The value of the counter is the value of the P counter minus
the value of the N counter.
Create a pn-counter
:
val nrOfUsers: Future[PNCounter] = db.create[PNCounter]("users")
Or:
val nrOfUsers: PNCounter = PNCounter("users")
nrOfUsers.store(context.system)
Find a pn-counter
by id:
val nrOfUsers: Future[PNCounter] = db.findById[PNCounter]("users")
Increment the counter by 1:
val nodeId = "some-unique-node-id"
val updatedNrOfUsers: PNCounter = nrOfUsers + nodeId
Decrement the counter by 1:
val nodeId = "some-unique-node-id"
val updatedNrOfUsers: PNCounter = nrOfUsers - nodeId
Increment the counter with a delta:
val updatedNrOfUsers: PNCounter = nrOfUsers + (nodeId, 5)
Decrement the counter with a delta:
val updatedNrOfUsers: PNCounter = nrOfUsers - (nodeId, 7)
Get the value of the counter:
val count: Int = nrOfUsers.value
Merge two counters:
val mergedCounter: PNCounter = thisCounter merge thatCounter
Store updates to a pn-counter
:
// from within an actor
counter.store(system)
Or:
db.update(counter)
Get the view (the current value) of the counter:
val nrOfUsersView: PNCounterView = nrOfUsers.view
Get JSON of the view:
val json: JsValue = nrOfUsersView.toJson
val jsonAsString: String = nrOfUsersView.toString
Get JSON of the counter (internal serialized representation):
val json: JsValue = nrOfUsers.toJson
val jsonAsString: String = nrOfUsers.toString
Create pn-counter by id.
curl -i -H "Accept: application/json" -X PUT http://127.0.0.1:9009/pn-counter/users
Create a pn-counter with a random id.
curl -i -H "Accept: application/json" -X PUT http://127.0.0.1:9009/pn-counter
Find pn-counter by id.
curl -i -H "Accept: application/json" http://127.0.0.1:9009/pn-counter/users
Increment the pn-counter with 'delta' > 0
curl -i -H "Accept: application/json" \
-X POST -d "node=darkstar" -d "delta=1" \
http://127.0.0.1:9009/pn-counter/users
Decrement the pn-counter with 'delta' < 0
curl -i -H "Accept: application/json" \
-X POST -d "node=darkstar" -d "delta=-1" \
http://127.0.0.1:9009/pn-counter/users
The JSON returned from the REST calls.
{
"type": "pn-counter",
"id": "users",
"value": 1
}
This is the internal representation of a pn-counter
:
{
"type": "pn-counter",
"id": "users",
"increments": {
"type": "g-counter",
"id": "users/inc",
"state": {
"node1": 3,
"node2": 6
}
},
"decrements": {
"type": "g-counter",
"id": "users/dec",
"state": {
"node1": 2,
"node2": 2
}
}
}
Set union is commutative and convergent; hence it is always safe to have
simultaneous writes to a set which only allows addition. You cannot
remove an element of a g-set
. A GSet
can only contain JSON values
of type JsValue
(play-json).
Create a g-set
:
val users: Future[GSet] = db.create[GSet]("users")
Or:
val users: GSet = GSet("users")
users.store(context.system)
Find a g-set
by id:
val users: Future[GSet] = db.findById[GSet]("users")
Add JSON element to the set:
val user: JsValue = Json.parse("""{"username":"john","password":"coltrane"}""")
val updatedUsers: GSet = users + user
Get the value of the set:
val userSet: immutable.Set[JsValue] = users.value
Merge two sets:
val mergedSet: GSet = thisSet merge thatSet
Store updates to a g-set
:
// from within an actor
set.store(system)
Or:
db.update(set)
Get the view (the current value) of the set:
val usersView: GSetView = users.view
Get JSON of the view:
val json: JsValue = usersView.toJson
val jsonAsString: String = usersView.toString
Other methods on GSet
:
def contains(element: JsValue): Boolean
def foreach(f: JsValue ⇒ Unit): Unit
def isEmpty: Boolean
def size: Int
Get JSON of the set (internal serialized representation):
val json: JsValue = users.toJson
val jsonAsString: String = users.toString
Create g-set with specific id.
curl -i -H "Accept: application/json" -X PUT http://127.0.0.1:9009/g-set/users
Create g-set with random id.
curl -i -H "Accept: application/json" -X PUT http://127.0.0.1:9009/g-set
Find g-set by id.
curl -i -H "Accept: application/json" http://127.0.0.1:9009/g-set/users
Add JSON data to the g-set.
curl -i -H "Accept: application/json" \
-X POST -d "node=darkstar" -d "{"username":"john","password":"coltrane"}" \
http://127.0.0.1:9009/g-set/users/add
The JSON returned from the REST calls.
{
"type": "g-set",
"id": "users",
"value": [{
"username": "john",
"password": "coltrane"
}, {
"username": "charlie",
"password": "parker"
}
]
}
This is the internal representation of a g-set
:
{
"type": "g-set",
"id": "users",
"value": [{
"username": "john",
"password": "coltrane"
}, {
"username": "charlie",
"password": "parker"
}
]
}
2p-set
sets consist of two g-sets
: one for adding, and another for removing.
To add an element, add it to the add set A. To remove e, add e to the remove
set R. An element can only be added once, and only removed once. Elements can
only be removed if they are present in the set. Removes take precedence over adds.
A TwoPhaseSet
can only contain JSON values of type JsValue
(play-json).
Create a 2p-set
:
val users: Future[TwoPhaseSet] = db.create[TwoPhaseSet]("users")
Or:
val users: TwoPhaseSet = TwoPhaseSet("users")
users.store(context.system)
Find a 2p-set
by id:
val users: Future[TwoPhaseSet] = db.findById[TwoPhaseSet]("users")
Add JSON element to the set:
val user: JsValue = Json.parse("""{"username":"john","password":"coltrane"}""")
val updatedUsers: TwoPhaseSet = users + user
Remove a JSON element from the set:
val updatedUsers: TwoPhaseSet = users - user
Get the value of the set:
val userSet: immutable.Set[JsValue] = users.value
Merge two sets:
val mergedSet: TwoPhaseSet = thisSet merge thatSet
Store updates to a 2p-set
:
// from within an actor
set.store(system)
Or:
db.update(set)
Get the view (the current value) of the set:
val usersView: TwoPhaseSetView = users.view
Get JSON of the view:
val json: JsValue = usersView.toJson
Get JSON of the view:
val json: JsValue = usersView.toJson
val jsonAsString: String = usersView.toString
Other methods on TwoPhaseSet
:
def contains(element: JsValue): Boolean
def foreach(f: JsValue ⇒ Unit): Unit
def isEmpty: Boolean
def size: Int
Get JSON of the set (internal serialized representation):
val json: JsValue = users.toJson
val jsonAsString: String = users.toString
Create 2p-set with specific id.
curl -i -H "Accept: application/json" -X PUT http://127.0.0.1:9009/2p-set/users
Create 2p-set with random id.
curl -i -H "Accept: application/json" -X PUT http://127.0.0.1:9009/2p-set
FInd a 2p-set by id.
curl -i -H "Accept: application/json" http://127.0.0.1:9009/2P-Set-set/users
Add JSON data to the 2p-set.
curl -i -H "Accept: application/json" \
-X POST -d "node=darkstar" -d "{"username":"john","password":"coltrane"}" \
http://127.0.0.1:9009/2p-set/users/add
Remove JSON data from the 2p-set.
curl -i -H "Accept: application/json" \
-X POST -d "node=darkstar" -d "{"username":"john","password":"coltrane"}" \
http://127.0.0.1:9009/2p-set/users/remove
The JSON returned from the REST calls.
{
"type": "2p-set",
"id": "users",
"value": [{
"username": "john",
"password": "coltrane"
}, {
"username": "charlie",
"password": "parker"
}
]
}
This is the internal representation of a 2p-set
:
{
"type": "2p-set",
"id": "users",
"adds": {
"type": "g-set",
"id": "users/adds",
"state": [{
"username": "john",
"password": "coltrane"
}, {
"username": "sonny",
"password": "rollins"
}, {
"username": "charlie",
"password": "parker"
}
]
},
"removes": {
"type": "g-set",
"id": "users/removes",
"state": [{
"username": "sonny",
"password": "rollins"
}
]
}
}
Operations-based
To be implemented.
All changes (newly merged CRDTs) are published to Akka Event Bus. If you are interested in getting these events you can just subscribe to them. Here is an example:
val listener = system.actorOf(Props(new Actor {
override def preStart(): Unit =
context.system.eventStream.subscribe(self, classOf[ConvergentReplicatedDataType])
override def postStop(): Unit =
context.system.eventStream.unsubscribe(self)
def receive = {
case updatedCRDT: ConvergentReplicatedDataType ⇒
// handle the updated CRDT
}
}))
This is the configuration where you can configure the REST server, backend storage systems etc.
akka {
crdt {
rest-server {
run = on
hostname = "127.0.0.1"
port = 9009
}
convergent {
batching-window = 10ms
change-set-resubmission-interval = 1000ms
# needs to implement the 'akka.crdt.convergent.Storage' trait
storage-class = akka.crdt.convergent.LevelDbStorage
# if native version is found that it is used -
# else it falls back to Java port of LevelDB
leveldb {
storage-path = "./leveldb" # directory for the database storage files
destroy-on-shutdown = off # deletes the database files for the
# specific node on shutdown
use-fsync = off # use fsync on write
verify-checksums = off # verify checksums on write
use-native = off # try to find native LevelDB, if not
# found then Java port will be used
cache-size = 104857600 # max size of the in-memory cache
}
}
commutative {
}
}
}